[Security][fix] To avoid possible sql injection via filters and or_filters parameters and tighten the field level checks (#5721)
* [fix] sanitize filters and or_filters to avoid sql injection * add test cases for filter sanitizer * codacy fix * added test cases to test valid scenarios
This commit is contained in:
parent
9e2e65f305
commit
e2b1ebe84c
3 changed files with 86 additions and 7 deletions
|
|
@ -140,7 +140,7 @@ class DatabaseQuery(object):
|
|||
|
||||
if self.or_conditions:
|
||||
args.conditions += (' or ' if args.conditions else "") + \
|
||||
' or '.join(self.or_conditions)
|
||||
' or '.join(self.or_conditions)
|
||||
|
||||
self.set_field_tables()
|
||||
|
||||
|
|
@ -190,7 +190,8 @@ class DatabaseQuery(object):
|
|||
As field contains `,` and mysql function `version()`, with the help of regex
|
||||
the system will filter out this field.
|
||||
'''
|
||||
regex = re.compile('^.*[,();].*')
|
||||
|
||||
sub_query_regex = re.compile("^.*[,();].*")
|
||||
blacklisted_keywords = ['select', 'create', 'insert', 'delete', 'drop', 'update', 'case']
|
||||
blacklisted_functions = ['concat', 'concat_ws', 'if', 'ifnull', 'nullif', 'coalesce',
|
||||
'connection_id', 'current_user', 'database', 'last_insert_id', 'session_user',
|
||||
|
|
@ -200,14 +201,22 @@ class DatabaseQuery(object):
|
|||
frappe.throw(_('Cannot use sub-query or function in fields'), frappe.DataError)
|
||||
|
||||
for field in self.fields:
|
||||
if regex.match(field):
|
||||
if any(keyword in field.lower() for keyword in blacklisted_keywords):
|
||||
if sub_query_regex.match(field):
|
||||
if any(keyword in field.lower().split() for keyword in blacklisted_keywords):
|
||||
_raise_exception()
|
||||
|
||||
if any("{0}(".format(keyword) in field.lower() \
|
||||
for keyword in blacklisted_functions):
|
||||
if any("({0}".format(keyword) in field.lower() for keyword in blacklisted_keywords):
|
||||
_raise_exception()
|
||||
|
||||
if any("{0}(".format(keyword) in field.lower() for keyword in blacklisted_functions):
|
||||
_raise_exception()
|
||||
|
||||
if re.compile("[a-zA-Z]+\s*'").match(field):
|
||||
_raise_exception()
|
||||
|
||||
if re.compile('[a-zA-Z]+\s*,').match(field):
|
||||
_raise_exception()
|
||||
|
||||
def extract_tables(self):
|
||||
"""extract tables from fields"""
|
||||
self.tables = ['`tab' + self.doctype + '`']
|
||||
|
|
|
|||
|
|
@ -117,6 +117,12 @@ class TestReportview(unittest.TestCase):
|
|||
fields=["name", "issingle, IF(issingle=1, (SELECT name from tabUser), count(*))"],
|
||||
limit_start=0, limit_page_length=1)
|
||||
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name", "issingle ''"],limit_start=0, limit_page_length=1)
|
||||
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name", "issingle,'"],limit_start=0, limit_page_length=1)
|
||||
|
||||
data = DatabaseQuery("DocType").execute(fields=["name", "issingle", "count(name)"],
|
||||
limit_start=0, limit_page_length=1)
|
||||
self.assertTrue('count(name)' in data[0])
|
||||
|
|
@ -133,6 +139,47 @@ class TestReportview(unittest.TestCase):
|
|||
"datediff(modified, creation) as date_diff"], limit_start=0, limit_page_length=1)
|
||||
self.assertTrue('date_diff' in data[0])
|
||||
|
||||
def test_filter_sanitizer(self):
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name"], filters={'istable,': 1}, limit_start=0, limit_page_length=1)
|
||||
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name"], filters={'editable_grid,': 1}, or_filters={'istable,': 1},
|
||||
limit_start=0, limit_page_length=1)
|
||||
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name"], filters={'editable_grid,': 1},
|
||||
or_filters=[['DocType', 'istable,', '=', 1]],
|
||||
limit_start=0, limit_page_length=1)
|
||||
|
||||
self.assertRaises(frappe.DataError, DatabaseQuery("DocType").execute,
|
||||
fields=["name"], filters={'editable_grid,': 1},
|
||||
or_filters=[['DocType', 'istable', '=', 1], ['DocType', 'beta and 1=1', '=', 0]],
|
||||
limit_start=0, limit_page_length=1)
|
||||
|
||||
data = DatabaseQuery("DocType").execute(fields=["name", "issingle"],
|
||||
filters={'editable_grid': 1}, or_filters=[['DocType', 'istable', '=', 1]],
|
||||
limit_start=0, limit_page_length=1)
|
||||
self.assertEquals('DocField', data[0]['name'])
|
||||
|
||||
data = DatabaseQuery("DocType").execute(fields=["name"],
|
||||
filters={'issingle': 1}, or_filters=[['DocType', 'module', '=', 'Core']],
|
||||
limit_start=0, limit_page_length=1)
|
||||
self.assertEquals('User Permission for Page and Report', data[0]['name'])
|
||||
|
||||
data = DatabaseQuery("DocType").execute(fields=["name"],
|
||||
filters={'track_changes': 1, 'module': 'Core'},
|
||||
limit_start=0, limit_page_length=4)
|
||||
self.assertEquals(['File', 'Version', 'Data Import', 'Domain Settings'],
|
||||
[d['name'] for d in data])
|
||||
|
||||
data = DatabaseQuery("DocType").execute(fields=["name"],
|
||||
filters=[['DocType', 'ifnull(track_changes, 0)', '=', 0],
|
||||
['DocType', 'module', '=', 'Core']],
|
||||
limit_start=0, limit_page_length=4)
|
||||
self.assertEquals(['DocField', 'User Permission for Page and Report', 'SMS Settings', 'Block Module'],
|
||||
[d['name'] for d in data])
|
||||
|
||||
def create_event(subject="_Test Event", starts_on=None):
|
||||
""" create a test event """
|
||||
|
||||
|
|
@ -145,4 +192,4 @@ def create_event(subject="_Test Event", starts_on=None):
|
|||
"starts_on": get_datetime(starts_on),
|
||||
}).insert(ignore_permissions=True)
|
||||
|
||||
return event
|
||||
return event
|
||||
|
|
|
|||
|
|
@ -801,6 +801,8 @@ def get_filter(doctype, f):
|
|||
|
||||
f = frappe._dict(doctype=f[0], fieldname=f[1], operator=f[2], value=f[3])
|
||||
|
||||
sanitize_column(f.fieldname)
|
||||
|
||||
if not f.operator:
|
||||
# if operator is missing
|
||||
f.operator = "="
|
||||
|
|
@ -840,6 +842,27 @@ def make_filter_dict(filters):
|
|||
|
||||
return _filter
|
||||
|
||||
def sanitize_column(column_name):
|
||||
from frappe import _
|
||||
regex = re.compile("^.*[,'();].*")
|
||||
blacklisted_keywords = ['select', 'create', 'insert', 'delete', 'drop', 'update', 'case', 'and', 'or']
|
||||
|
||||
def _raise_exception():
|
||||
frappe.throw(_("Invalid field name {0}").format(column_name), frappe.DataError)
|
||||
|
||||
if 'ifnull' in column_name:
|
||||
if regex.match(column_name):
|
||||
# to avoid and, or
|
||||
if any(' {0} '.format(keyword) in column_name.split() for keyword in blacklisted_keywords):
|
||||
_raise_exception()
|
||||
|
||||
# to avoid select, delete, drop, update and case
|
||||
elif any(keyword in column_name.split() for keyword in blacklisted_keywords):
|
||||
_raise_exception()
|
||||
|
||||
elif regex.match(column_name):
|
||||
_raise_exception()
|
||||
|
||||
def scrub_urls(html):
|
||||
html = expand_relative_urls(html)
|
||||
# encoding should be responsibility of the composer
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue