[enhance] Add condition in transitions and move workflow to server-side (#5399)

* [workflow] move to server side

* [workflow] tests

* [workflow] tests

* [minor] remove print

* [fixes] tests and lint

* [tests] fix typo

* [fix] tests
This commit is contained in:
Rushabh Mehta 2018-04-11 00:38:13 -08:00 committed by GitHub
parent cfa2006bf3
commit c4e1b9e0c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 661 additions and 327 deletions

View file

@ -502,16 +502,15 @@ def clear_cache(user=None, doctype=None):
:param user: If user is given, only user cache is cleared.
:param doctype: If doctype is given, only DocType cache is cleared."""
import frappe.sessions
import frappe.cache_manager
if doctype:
import frappe.model.meta
frappe.model.meta.clear_cache(doctype)
frappe.cache_manager.clear_doctype_cache(doctype)
reset_metadata_version()
elif user:
frappe.sessions.clear_cache(user)
frappe.cache_manager.clear_user_cache(user)
else: # everything
from frappe import translate
frappe.sessions.clear_cache()
frappe.cache_manager.clear_user_cache()
translate.clear_cache()
reset_metadata_version()
local.cache = {}
@ -1489,7 +1488,7 @@ def safe_decode(param, encoding = 'utf-8'):
except Exception:
pass
return param
def parse_json(val):
from frappe.utils import parse_json
return parse_json(val)
@ -1505,7 +1504,7 @@ def mock(type, size = 1, locale = 'en'):
results.append(data)
from frappe.chat.util import squashify
results = squashify(results)
return results

77
frappe/cache_manager.py Normal file
View file

@ -0,0 +1,77 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import unicode_literals
import frappe
import frappe.defaults
from frappe.desk.notifications import delete_notification_count_for, clear_notifications
common_default_keys = ["__default", "__global"]
def clear_user_cache(user=None):
cache = frappe.cache()
groups = ("bootinfo", "user_recent", "roles", "user_doc", "lang",
"defaults", "user_permissions", "home_page", "linked_with",
"desktop_icons", 'portal_menu_items')
if user:
for name in groups:
cache.hdel(name, user)
cache.delete_keys("user:" + user)
clear_defaults_cache(user)
else:
for name in groups:
cache.delete_key(name)
clear_global_cache()
clear_defaults_cache()
clear_notifications(user)
def clear_global_cache():
clear_doctype_cache()
frappe.cache().delete_value(["app_hooks", "installed_apps",
"app_modules", "module_app", "notification_config", 'system_settings',
'scheduler_events', 'time_zone', 'webhooks', 'active_domains', 'active_modules'])
frappe.setup_module_map()
def clear_defaults_cache(user=None):
if user:
for p in ([user] + common_default_keys):
frappe.cache().hdel("defaults", p)
elif frappe.flags.in_install!="frappe":
frappe.cache().delete_key("defaults")
def clear_doctype_cache(doctype=None):
cache = frappe.cache()
if getattr(frappe.local, 'meta_cache') and (doctype in frappe.local.meta_cache):
del frappe.local.meta_cache[doctype]
for key in ('is_table', 'doctype_modules'):
cache.delete_value(key)
groups = ["meta", "form_meta", "table_columns", "last_modified",
"linked_doctypes", 'email_alerts', 'workflow']
def clear_single(dt):
for name in groups:
cache.hdel(name, dt)
if doctype:
clear_single(doctype)
# clear all parent doctypes
for dt in frappe.db.sql("""select parent from tabDocField
where fieldtype="Table" and options=%s""", (doctype,)):
clear_single(dt[0])
# clear all notifications
delete_notification_count_for(doctype)
else:
# clear all
for name in groups:
cache.delete_value(name)

View file

@ -10,7 +10,6 @@ from frappe.custom.doctype.custom_field.custom_field import create_custom_field
from frappe.model.document import Document
class DataMigrationPlan(Document):
def on_update(self):
# update custom fields in mappings
self.make_custom_fields_for_mappings()

View file

@ -4,12 +4,11 @@
from __future__ import unicode_literals
import frappe
from frappe.desk.notifications import clear_notifications
from frappe.cache_manager import clear_defaults_cache, common_default_keys
# Note: DefaultValue records are identified by parenttype
# __default, __global or 'User Permission'
common_keys = ["__default", "__global"]
def set_user_default(key, value, user=None, parenttype=None):
set_default(key, value, user or frappe.session.user, parenttype)
@ -154,10 +153,10 @@ def clear_default(key=None, value=None, parent=None, name=None, parenttype=None)
values.append(parenttype)
if parent:
clear_cache(parent)
clear_defaults_cache(parent)
else:
clear_cache("__default")
clear_cache("__global")
clear_defaults_cache("__default")
clear_defaults_cache("__global")
if not conditions:
raise Exception("[clear_default] No key specified.")
@ -194,15 +193,8 @@ def get_defaults_for(parent="__default"):
return defaults
def _clear_cache(parent):
if parent in common_keys:
if parent in common_default_keys:
frappe.clear_cache()
else:
clear_notifications(user=parent)
frappe.clear_cache(user=parent)
def clear_cache(user=None):
if user:
for p in ([user] + common_keys):
frappe.cache().hdel("defaults", p)
elif frappe.flags.in_install!="frappe":
frappe.cache().delete_key("defaults")

View file

@ -7,7 +7,6 @@ from __future__ import unicode_literals
import frappe, os
from frappe.model.meta import Meta
from frappe.modules import scrub, get_module_path, load_doctype_module
from frappe.model.workflow import get_workflow_name
from frappe.utils import get_html_format
from frappe.translate import make_dict_from_messages, extract_messages_from_code
from frappe.model.utils import render_include
@ -143,7 +142,7 @@ class FormMeta(Meta):
def load_workflows(self):
# get active workflow
workflow_name = get_workflow_name(self.name)
workflow_name = self.get_workflow()
workflow_docs = []
if workflow_name and frappe.db.exists("Workflow", workflow_name):

View file

@ -13,6 +13,7 @@ from six import iteritems, string_types
from werkzeug.exceptions import NotFound, Forbidden
import hashlib, json
from frappe.model import optional_fields
from frappe.model.workflow import validate_workflow
from frappe.utils.file_manager import save_url
from frappe.utils.global_search import update_global_search
from frappe.integrations.doctype.webhook import run_webhooks
@ -443,6 +444,7 @@ class Document(BaseDocument):
self._extract_images_from_text_editor()
self._sanitize_content()
self._save_passwords()
self.validate_workflow()
children = self.get_all_children()
for d in children:
@ -459,6 +461,11 @@ class Document(BaseDocument):
for fieldname in optional_fields:
self.set(fieldname, None)
def validate_workflow(self):
'''Validate if the workflow transition is valid'''
if self.meta.get_workflow():
validate_workflow(self)
def validate_set_only_once(self):
'''Validate that fields are not changed if not in insert'''
set_only_once_fields = self.meta.get_set_only_once_fields()
@ -873,7 +880,8 @@ class Document(BaseDocument):
self._doc_before_save = None
if not (self.is_new()
and (getattr(self.meta, 'track_changes', False)
or self.meta.get_set_only_once_fields())):
or self.meta.get_set_only_once_fields()
or self.meta.get_workflow())):
self.get_doc_before_save()
def run_post_save_methods(self):

View file

@ -24,6 +24,7 @@ from frappe.model.document import Document
from frappe.model.base_document import BaseDocument
from frappe.model.db_schema import type_map
from frappe.modules import load_doctype_module
from frappe.model.workflow import get_workflow_name
from frappe import _
def get_meta(doctype, cached=True):
@ -238,6 +239,9 @@ class Meta(Document):
field = self.get_field(fieldname)
return field and field.translatable
def get_workflow(self):
return get_workflow_name(self.name)
def process(self):
# don't process for special doctypes
# prevent's circular dependency
@ -510,36 +514,3 @@ def trim_tables(doctype=None):
query = """alter table `tab{doctype}` {columns}""".format(
doctype=doctype, columns=columns_to_remove)
frappe.db.sql_ddl(query)
def clear_cache(doctype=None):
cache = frappe.cache()
if getattr(frappe.local, 'meta_cache') and (doctype in frappe.local.meta_cache):
del frappe.local.meta_cache[doctype]
for key in ('is_table', 'doctype_modules'):
cache.delete_value(key)
groups = ["meta", "form_meta", "table_columns", "last_modified",
"linked_doctypes", 'email_alerts']
def clear_single(dt):
for name in groups:
cache.hdel(name, dt)
if doctype:
clear_single(doctype)
# clear all parent doctypes
for dt in frappe.db.sql("""select parent from tabDocField
where fieldtype="Table" and options=%s""", (doctype,)):
clear_single(dt[0])
# clear all notifications
from frappe.desk.notifications import delete_notification_count_for
delete_notification_count_for(doctype)
else:
# clear all
for name in groups:
cache.delete_value(name)

View file

@ -74,7 +74,6 @@ def get_doc_files(files, start_path, force=0, sync_everything = False, verbose=F
for doctype in document_types:
doctype_path = os.path.join(start_path, doctype)
if os.path.exists(doctype_path):
for docname in os.listdir(doctype_path):
if os.path.isdir(os.path.join(doctype_path, docname)):
doc_path = os.path.join(doctype_path, docname, docname) + ".json"

View file

@ -3,24 +3,133 @@
from __future__ import unicode_literals
import frappe
from frappe.utils import cint
from frappe import _
class WorkflowStateError(frappe.ValidationError): pass
class WorkflowTransitionError(frappe.ValidationError): pass
class WorkflowPermissionError(frappe.ValidationError): pass
def get_workflow_name(doctype):
if getattr(frappe.local, "workflow_names", None) is None:
frappe.local.workflow_names = {}
if doctype not in frappe.local.workflow_names:
workflow_name = frappe.cache().hget('workflow', doctype)
if workflow_name is None:
workflow_name = frappe.db.get_value("Workflow", {"document_type": doctype,
"is_active": 1}, "name")
frappe.cache().hset('workflow', doctype, workflow_name or '')
frappe.local.workflow_names[doctype] = workflow_name
return workflow_name
return frappe.local.workflow_names[doctype]
@frappe.whitelist()
def get_transitions(doc, workflow = None):
'''Return list of possible transitions for the given doc'''
doc = frappe.get_doc(frappe.parse_json(doc))
def get_default_state(doctype):
workflow_name = get_workflow_name(doctype)
return frappe.db.get_value("Workflow Document State", {"parent": workflow_name,
"idx":1}, "state")
if doc.is_new():
return []
def get_state_fieldname(doctype):
workflow_name = get_workflow_name(doctype)
return frappe.db.get_value("Workflow", workflow_name, "workflow_state_field")
frappe.has_permission(doc, 'read', throw=True)
roles = frappe.get_roles()
if not workflow:
workflow = get_workflow(doc.doctype)
current_state = doc.get(workflow.workflow_state_field)
if not current_state:
frappe.throw(_('Workflow State not set'), WorkflowStateError)
transitions = []
for transition in workflow.transitions:
if transition.state == current_state and transition.allowed in roles:
if transition.condition:
# if condition, evaluate
# access to frappe.db.get_value and frappe.db.get_list
success = frappe.safe_eval(transition.condition,
dict(frappe = frappe._dict(
db = frappe._dict(get_value = frappe.db.get_value, get_list=frappe.db.get_list),
session = frappe.session
)),
dict(doc = doc))
if not success:
continue
transitions.append(transition.as_dict())
return transitions
@frappe.whitelist()
def apply_workflow(doc, action):
'''Allow workflow action on the current doc'''
doc = frappe.get_doc(frappe.parse_json(doc))
workflow = get_workflow(doc.doctype)
transitions = get_transitions(doc, workflow)
# find the transition
transition = None
for t in transitions:
if t.action == action:
transition = t
if not transition:
frappe.throw(_("Not a valid Workflow Action"), WorkflowTransitionError)
# update workflow state field
doc.set(workflow.workflow_state_field, transition.next_state)
# find settings for the next state
next_state = [d for d in workflow.states if d.state == transition.next_state][0]
# update any additional field
if next_state.update_field:
doc.set(next_state.update_field, next_state.update_value)
new_docstatus = cint(next_state.doc_status)
if doc.docstatus == 0 and new_docstatus == 0:
doc.save()
elif doc.docstatus == 0 and new_docstatus == 1:
doc.submit()
elif doc.docstatus == 1 and new_docstatus == 1:
doc.save()
elif doc.docstatus == 1 and new_docstatus == 2:
doc.cancel()
else:
frappe.throw(_('Illegal Document Status for {0}').format(next_state.state))
doc.add_comment('Workflow', _(next_state.state))
return doc
def validate_workflow(doc):
'''Validate Workflow State and Transition for the current user.
- Check if user is allowed to edit in current state
- Check if user is allowed to transition to the next state (if changed)
'''
workflow = get_workflow(doc.doctype)
current_state = None
if getattr(doc, '_doc_before_save', None):
current_state = doc._doc_before_save.get(workflow.workflow_state_field)
next_state = doc.get(workflow.workflow_state_field)
if not next_state:
# set default state (maybe not set in insert)
current_state = next_state = workflow.states[0].state
doc.set(workflow.workflow_state_field, workflow.states[0].state)
state_row = [d for d in workflow.states if d.state == current_state]
if not state_row:
frappe.throw(_('{0} is not a valid Workflow State. Please update your Workflow and try again.'.format(frappe.bold(current_state))))
state_row = state_row[0]
# check if user is allowed to edit in current state
if not state_row.allow_edit in frappe.get_roles():
frappe.throw(_('Not allowed to edit in Workflow State {0}'.format(frappe.bold(current_state))), WorkflowPermissionError)
# if transitioning, check if user is allowed to transition
if current_state != next_state:
transitions = get_transitions(doc._doc_before_save)
transition = [d for d in transitions if d.next_state == next_state]
if not transition:
frappe.throw(_('Workflow State {0} is not allowed').format(frappe.bold(next_state)), WorkflowPermissionError)
def get_workflow(doctype):
return frappe.get_doc('Workflow', get_workflow_name(doctype))

View file

@ -26,22 +26,25 @@ frappe.ui.form.States = Class.extend({
var d = new frappe.ui.Dialog({
title: "Workflow: "
+ frappe.workflow.workflows[me.frm.doctype].name
})
var next_html = $.map(frappe.workflow.get_transitions(me.frm.doctype, state),
function(d) {
return d.action.bold() + __(" by Role ") + d.allowed;
}).join(", ") || __("None: End of Workflow").bold();
});
$(d.body).html("<p>"+__("Current status")+": " + state.bold() + "</p>"
+ "<p>"+__("Document is only editable by users of role")+": "
+ frappe.workflow.get_document_state(me.frm.doctype,
state).allow_edit.bold() + "</p>"
+ "<p>"+__("Next actions")+": "+ next_html +"</p>"
+ (me.frm.doc.__islocal ? ("<div class='alert alert-info'>"
+__("Workflow will start after saving.")+"</div>") : "")
+ "<p class='help'>"+__("Note: Other permission rules may also apply")+"</p>"
frappe.workflow.get_transitions(me.frm.doc).then((transitions) => {
var next_html = $.map(transitions,
function(d) {
return d.action.bold() + __(" by Role ") + d.allowed;
}).join(", ") || __("None: End of Workflow").bold();
$(d.body).html("<p>"+__("Current status")+": " + state.bold() + "</p>"
+ "<p>"+__("Document is only editable by users of role")+": "
+ frappe.workflow.get_document_state(me.frm.doctype,
state).allow_edit.bold() + "</p>"
+ "<p>"+__("Next actions")+": "+ next_html +"</p>"
+ (me.frm.doc.__islocal ? ("<div class='alert alert-info'>"
+__("Workflow will start after saving.")+"</div>") : "")
+ "<p class='help'>"+__("Note: Other permission rules may also apply")+"</p>"
).css({padding: '15px'});
d.show();
d.show();
});
}, true);
},
@ -61,7 +64,7 @@ frappe.ui.form.States = Class.extend({
}
},
show_actions: function(state) {
show_actions: function() {
var added = false,
me = this;
@ -72,57 +75,20 @@ frappe.ui.form.States = Class.extend({
return;
}
$.each(frappe.workflow.get_transitions(this.frm.doctype, state), function(i, d) {
if(frappe.user_roles.includes(d.allowed)) {
added = true;
me.frm.page.add_action_item(__(d.action), function() {
var action = d.action;
// capture current state
var doc_before_action = copy_dict(me.frm.doc);
// set new state
var next_state = frappe.workflow.get_next_state(me.frm.doctype,
me.frm.doc[me.state_fieldname], action);
me.frm.doc[me.state_fieldname] = next_state;
var new_state = frappe.workflow.get_document_state(me.frm.doctype, next_state);
var new_docstatus = cint(new_state.doc_status);
if(new_state.update_field) {
me.frm.set_value(new_state.update_field, new_state.update_value);
}
// revert state on error
var on_error = function() {
// reset in locals
frappe.model.add_to_locals(doc_before_action);
me.frm.refresh();
}
// success - add a comment
var success = function() {
me.frm.timeline.insert_comment("Workflow", next_state);
}
if(new_docstatus==1 && me.frm.doc.docstatus==0) {
me.frm.savesubmit(null, success, on_error);
} else if(new_docstatus==0 && me.frm.doc.docstatus==0) {
me.frm.save("Save", success, null, on_error);
} else if(new_docstatus==1 && me.frm.doc.docstatus==1) {
me.frm.save("Update", success, null, on_error);
} else if(new_docstatus==2 && me.frm.doc.docstatus==1) {
me.frm.savecancel(null, success, on_error);
} else {
frappe.msgprint(__("Document Status transition from ") + me.frm.doc.docstatus + " "
+ __("to") +
new_docstatus + " " + __("is not allowed."));
frappe.msgprint(__("Document Status transition from {0} to {1} is not allowed", [me.frm.doc.docstatus, new_docstatus]));
return false;
}
return false;
});
}
frappe.workflow.get_transitions(this.frm.doc).then(transitions => {
$.each(transitions, function(i, d) {
if(frappe.user_roles.includes(d.allowed)) {
added = true;
me.frm.page.add_action_item(__(d.action), function() {
frappe.xcall('frappe.model.workflow.apply_workflow',
{doc: me.frm.doc, action: d.action})
.then((doc) => {
frappe.model.sync(doc);
me.frm.refresh();
});
});
}
});
});
if(added) {
@ -144,11 +110,5 @@ frappe.ui.form.States = Class.extend({
this.set_default_state();
}
return this.frm.doc[this.state_fieldname];
},
bind_action: function() {
var me = this;
this.dropdown.on("click", "[data-action]", function() {
})
}
});

View file

@ -32,18 +32,14 @@ frappe.workflow = {
});
return value;
},
get_transitions: function(doctype, state) {
frappe.workflow.setup(doctype);
return frappe.get_children(frappe.workflow.workflows[doctype], "transitions", {state:state});
get_transitions: function(doc) {
frappe.workflow.setup(doc.doctype);
return frappe.xcall('frappe.model.workflow.get_transitions', {doc: doc});
},
get_document_state: function(doctype, state) {
frappe.workflow.setup(doctype);
return frappe.get_children(frappe.workflow.workflows[doctype], "states", {state:state})[0];
},
get_next_state: function(doctype, state, action) {
return frappe.get_children(frappe.workflow.workflows[doctype], "transitions", {
state:state, action:action})[0].next_state;
},
is_read_only: function(doctype, name) {
var state_fieldname = frappe.workflow.get_state_fieldname(doctype);
if(state_fieldname) {

View file

@ -8,6 +8,21 @@ frappe.request.url = '/';
frappe.request.ajax_count = 0;
frappe.request.waiting_for_ajax = [];
frappe.xcall = function(method, params) {
return new Promise((resolve, reject) => {
frappe.call({
method: method,
args: params,
callback: (r) => {
resolve(r.message);
},
error: (r) => {
reject(r.message);
}
});
});
};
// generic server call (call page, object)
frappe.call = function(opts) {
if (!frappe.is_online()) {

View file

@ -18,45 +18,17 @@ import frappe.translate
from frappe.utils.change_log import get_change_log
import redis
from six.moves.urllib.parse import unquote
from frappe.desk.notifications import clear_notifications
from six import text_type
from frappe.cache_manager import clear_global_cache, clear_user_cache
@frappe.whitelist()
def clear(user=None):
frappe.local.session_obj.update(force=True)
frappe.local.db.commit()
clear_cache(frappe.session.user)
clear_user_cache(frappe.session.user)
clear_global_cache()
frappe.response['message'] = _("Cache Cleared")
def clear_cache(user=None):
cache = frappe.cache()
groups = ("bootinfo", "user_recent", "roles", "user_doc", "lang",
"defaults", "user_permissions", "home_page", "linked_with",
"desktop_icons", 'portal_menu_items')
if user:
for name in groups:
cache.hdel(name, user)
cache.delete_keys("user:" + user)
frappe.defaults.clear_cache(user)
else:
for name in groups:
cache.delete_key(name)
clear_global_cache()
frappe.defaults.clear_cache()
clear_notifications(user)
def clear_global_cache():
frappe.model.meta.clear_cache()
frappe.cache().delete_value(["app_hooks", "installed_apps",
"app_modules", "module_app", "notification_config", 'system_settings',
'scheduler_events', 'time_zone', 'webhooks', 'active_domains', 'active_modules'])
frappe.setup_module_map()
def clear_sessions(user=None, keep_current=False, device=None, force=False):
'''Clear other sessions of the current user. Called at login / logout

View file

@ -209,7 +209,7 @@ class TestPermissions(unittest.TestCase):
frappe.set_user("test2@example.com")
frappe.model.meta.clear_cache("Blog Post")
frappe.clear_cache(doctype="Blog Post")
doc = frappe.get_doc("Blog Post", "-test-blog-post")
self.assertFalse(doc.has_permission("read"))
@ -217,7 +217,7 @@ class TestPermissions(unittest.TestCase):
doc = frappe.get_doc("Blog Post", "-test-blog-post-2")
self.assertTrue(doc.has_permission("read"))
frappe.model.meta.clear_cache("Blog Post")
frappe.clear_cache(doctype="Blog Post")
def if_owner_setup(self):
update('Blog Post', 'Blogger', 0, 'if_owner', 1)
@ -227,7 +227,7 @@ class TestPermissions(unittest.TestCase):
add_user_permission("Blogger", "_Test Blogger 1",
"test2@example.com")
frappe.model.meta.clear_cache("Blog Post")
frappe.clear_cache(doctype="Blog Post")
def test_insert_if_owner_with_user_permissions(self):
"""If `If Owner` is checked for a Role, check if that document

View file

@ -76,6 +76,11 @@ def enqueue_events_for_all_sites():
print(frappe.get_traceback())
def enqueue_events_for_site(site, queued_jobs):
def log_and_raise():
frappe.logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site) +
'\n' + frappe.get_traceback())
raise # pylint: disable=misplaced-bare-raise
try:
frappe.init(site=site)
if frappe.local.conf.maintenance_mode:
@ -91,11 +96,13 @@ def enqueue_events_for_site(site, queued_jobs):
enqueue_events(site=site, queued_jobs=queued_jobs)
frappe.logger(__name__).debug('Queued events for site {0}'.format(site))
except pymysql.OperationalError as e:
if e.args[0]==ER.ACCESS_DENIED_ERROR:
frappe.logger(__name__).debug('Access denied for site {0}'.format(site))
else:
log_and_raise()
except:
frappe.logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site) +
'\n' + frappe.get_traceback())
raise
log_and_raise()
finally:
frappe.destroy()

View file

@ -42,8 +42,7 @@ class PortalSettings(Document):
def clear_cache(self):
# make js and css
# clear web cache (for menus!)
from frappe.sessions import clear_cache
clear_cache('Guest')
frappe.clear_cache(user='Guest')
from frappe.website.render import clear_cache
clear_cache()

View file

@ -9,11 +9,10 @@ import frappe
from frappe.model.document import Document
class WebsiteScript(Document):
def on_update(self):
"""clear cache"""
from frappe.sessions import clear_cache
clear_cache('Guest')
frappe.clear_cache(user = 'Guest')
from frappe.website.render import clear_cache
clear_cache()

View file

@ -59,8 +59,7 @@ class WebsiteSettings(Document):
def clear_cache(self):
# make js and css
# clear web cache (for menus!)
from frappe.sessions import clear_cache
clear_cache('Guest')
frappe.clear_cache(user = 'Guest')
from frappe.website.render import clear_cache
clear_cache()

View file

@ -4,8 +4,92 @@ from __future__ import unicode_literals
import frappe
import unittest
test_records = frappe.get_test_records('Workflow')
from frappe.utils import random_string
from frappe.model.workflow import apply_workflow, WorkflowTransitionError, WorkflowPermissionError
class TestWorkflow(unittest.TestCase):
pass
def setUp(self):
if not getattr(self, 'workflow', None):
frappe.get_doc(dict(doctype='Role',
role_name='Test Approver')).insert(ignore_if_duplicate=True)
if frappe.db.exists('Workflow', 'Test ToDo'):
self.workflow = frappe.get_doc('Workflow', 'Test ToDo')
self.workflow.save()
else:
self.workflow = frappe.new_doc('Workflow')
self.workflow.workflow_name = 'Test ToDo'
self.workflow.document_type = 'ToDo'
self.workflow.workflow_state_field = 'workflow_state'
self.workflow.is_active = 1
self.workflow.append('states', dict(
state = 'Pending', allow_edit = 'All'
))
self.workflow.append('states', dict(
state = 'Approved', allow_edit = 'Test Approver',
update_field = 'status', update_value = 'Closed'
))
self.workflow.append('states', dict(
state = 'Rejected', allow_edit = 'Test Approver'
))
self.workflow.append('transitions', dict(
state = 'Pending', action='Approve', next_state = 'Approved', allowed='Test Approver'
))
self.workflow.append('transitions', dict(
state = 'Pending', action='Reject', next_state = 'Rejected', allowed='Test Approver'
))
self.workflow.append('transitions', dict(
state = 'Rejected', action='Review', next_state = 'Pending', allowed='All'
))
self.workflow.insert()
frappe.set_user('Administrator')
def test_default_condition(self):
'''test default condition is set'''
todo = frappe.get_doc(dict(doctype='ToDo', description='workflow ' + random_string(10))).insert()
# default condition is set
self.assertEqual(todo.workflow_state, 'Pending')
return todo
def test_approve(self):
'''test simple workflow'''
todo = self.test_default_condition()
apply_workflow(todo, 'Approve')
# default condition is set
self.assertEqual(todo.workflow_state, 'Approved')
self.assertEqual(todo.status, 'Closed')
return todo
def test_wrong_action(self):
'''Check illegal action (approve after reject)'''
todo = self.test_approve()
self.assertRaises(WorkflowTransitionError, apply_workflow, todo, 'Reject')
def test_workflow_role(self):
'''Check if user is allowed to edit in state via role'''
todo = self.test_approve()
todo.description = 'new'
frappe.set_user('test@example.com')
self.assertRaises(WorkflowPermissionError, todo.save, ignore_permissions=True)
frappe.set_user('Administrator')
def test_workflow_condition(self):
'''Test condition in transition'''
self.workflow.transitions[0].condition = 'doc.status == "Closed"'
self.workflow.save()
# only approve if status is closed
self.assertRaises(WorkflowTransitionError, self.test_approve)
self.workflow.transitions[0].condition = ''
self.workflow.save()

View file

@ -8,7 +8,6 @@ from frappe import _
from frappe.model.document import Document
class Workflow(Document):
def validate(self):
self.set_active()
self.create_custom_field_for_workflow_state()

View file

@ -1,142 +1,293 @@
{
"allow_copy": 0,
"allow_import": 0,
"allow_rename": 0,
"beta": 0,
"creation": "2013-02-22 01:27:36",
"custom": 0,
"description": "Defines actions on states and the next step and allowed roles.",
"docstatus": 0,
"doctype": "DocType",
"editable_grid": 1,
"allow_copy": 0,
"allow_guest_to_view": 0,
"allow_import": 0,
"allow_rename": 0,
"beta": 0,
"creation": "2013-02-22 01:27:36",
"custom": 0,
"description": "Defines actions on states and the next step and allowed roles.",
"docstatus": 0,
"doctype": "DocType",
"editable_grid": 1,
"fields": [
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "state",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "State",
"length": 0,
"no_copy": 0,
"options": "Workflow State",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0,
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "state",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "State",
"length": 0,
"no_copy": 0,
"options": "Workflow State",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0,
"width": "200px"
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "action",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Action",
"length": 0,
"no_copy": 0,
"options": "Workflow Action",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0,
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "action",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Action",
"length": 0,
"no_copy": 0,
"options": "Workflow Action",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0,
"width": "200px"
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "next_state",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Next State",
"length": 0,
"no_copy": 0,
"options": "Workflow State",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0,
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "next_state",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Next State",
"length": 0,
"no_copy": 0,
"options": "Workflow State",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0,
"width": "200px"
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "allowed",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Allowed",
"length": 0,
"no_copy": 0,
"options": "Role",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0,
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "allowed",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Allowed",
"length": 0,
"no_copy": 0,
"options": "Role",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"print_width": "200px",
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0,
"width": "200px"
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "conditions",
"fieldtype": "Section Break",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Conditions",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "condition",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Condition",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "column_break_7",
"fieldtype": "Column Break",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "example",
"fieldtype": "HTML",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Example",
"length": 0,
"no_copy": 0,
"options": "<pre><code>doc.grand_total &gt; 0</code></pre>\n\n<p>Conditions should be written in simple Python. Please use properties available in the form only.</p>",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"translatable": 0,
"unique": 0
}
],
"hide_heading": 0,
"hide_toolbar": 0,
"idx": 1,
"image_view": 0,
"in_create": 0,
"in_dialog": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 1,
"max_attachments": 0,
"modified": "2016-07-11 03:28:10.146195",
"modified_by": "Administrator",
"module": "Workflow",
"name": "Workflow Transition",
"owner": "Administrator",
"permissions": [],
"quick_entry": 0,
"read_only": 0,
"read_only_onload": 0,
],
"has_web_view": 0,
"hide_heading": 0,
"hide_toolbar": 0,
"idx": 1,
"image_view": 0,
"in_create": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 1,
"max_attachments": 0,
"modified": "2018-04-04 23:41:58.233754",
"modified_by": "Administrator",
"module": "Workflow",
"name": "Workflow Transition",
"owner": "Administrator",
"permissions": [],
"quick_entry": 0,
"read_only": 0,
"read_only_onload": 0,
"show_name_in_global_search": 0,
"track_changes": 0,
"track_seen": 0
}