fix: Always validate file URLs (#12685)
* feat: validate file urls * fix: validate file url while downloading content * fix: cleaner validation * fix: added separate validation for web URLs Co-authored-by: Saurabh <saurabh6790@gmail.com> Co-authored-by: Suraj Shetty <13928957+surajshetty3416@users.noreply.github.com>
This commit is contained in:
parent
8cdb459895
commit
bbeb241232
2 changed files with 92 additions and 54 deletions
|
|
@ -94,52 +94,89 @@ class File(Document):
|
|||
self.set_file_name()
|
||||
self.validate_duplicate_entry()
|
||||
self.validate_attachment_limit()
|
||||
|
||||
self.validate_folder()
|
||||
|
||||
if not self.file_url and not self.flags.ignore_file_validate:
|
||||
if not self.is_folder:
|
||||
if self.is_folder:
|
||||
self.file_url = ""
|
||||
else:
|
||||
self.validate_url()
|
||||
|
||||
self.file_size = frappe.form_dict.file_size or self.file_size
|
||||
|
||||
def validate_url(self):
|
||||
if not self.file_url or self.file_url.startswith(("http://", "https://")):
|
||||
if not self.flags.ignore_file_validate:
|
||||
self.validate_file()
|
||||
self.generate_content_hash()
|
||||
|
||||
if frappe.db.exists('File', {'name': self.name, 'is_folder': 0}):
|
||||
old_file_url = self.file_url
|
||||
if not self.is_folder and (self.is_private != self.db_get('is_private')):
|
||||
private_files = frappe.get_site_path('private', 'files')
|
||||
public_files = frappe.get_site_path('public', 'files')
|
||||
return
|
||||
|
||||
file_name = self.file_url.split('/')[-1]
|
||||
if not self.is_private:
|
||||
shutil.move(os.path.join(private_files, file_name),
|
||||
os.path.join(public_files, file_name))
|
||||
# Probably an invalid web URL
|
||||
if not self.file_url.startswith(("/files/", "/private/files/")):
|
||||
frappe.throw(
|
||||
_("URL must start with http:// or https://"),
|
||||
title=_('Invalid URL')
|
||||
)
|
||||
|
||||
self.file_url = "/files/{0}".format(file_name)
|
||||
# Ensure correct formatting and type
|
||||
self.file_url = unquote(self.file_url)
|
||||
self.is_private = cint(self.is_private)
|
||||
|
||||
else:
|
||||
shutil.move(os.path.join(public_files, file_name),
|
||||
os.path.join(private_files, file_name))
|
||||
self.handle_is_private_changed()
|
||||
|
||||
self.file_url = "/private/files/{0}".format(file_name)
|
||||
base_path = os.path.realpath(get_files_path(is_private=self.is_private))
|
||||
if not os.path.realpath(self.get_full_path()).startswith(base_path):
|
||||
frappe.throw(
|
||||
_("The File URL you've entered is incorrect"),
|
||||
title=_('Invalid File URL')
|
||||
)
|
||||
|
||||
update_existing_file_docs(self)
|
||||
def handle_is_private_changed(self):
|
||||
if not frappe.db.exists(
|
||||
'File', {
|
||||
'name': self.name,
|
||||
'is_private': cint(not self.is_private)
|
||||
}
|
||||
):
|
||||
return
|
||||
|
||||
# update documents image url with new file url
|
||||
if self.attached_to_doctype and self.attached_to_name:
|
||||
if not self.attached_to_field:
|
||||
field_name = None
|
||||
reference_dict = frappe.get_doc(self.attached_to_doctype, self.attached_to_name).as_dict()
|
||||
for key, value in reference_dict.items():
|
||||
if value == old_file_url:
|
||||
field_name = key
|
||||
break
|
||||
self.attached_to_field = field_name
|
||||
if self.attached_to_field:
|
||||
frappe.db.set_value(self.attached_to_doctype, self.attached_to_name,
|
||||
self.attached_to_field, self.file_url)
|
||||
old_file_url = self.file_url
|
||||
|
||||
self.validate_url()
|
||||
file_name = self.file_url.split('/')[-1]
|
||||
private_file_path = frappe.get_site_path('private', 'files', file_name)
|
||||
public_file_path = frappe.get_site_path('public', 'files', file_name)
|
||||
|
||||
if self.file_url and (self.is_private != self.file_url.startswith('/private')):
|
||||
frappe.throw(_('Invalid file URL. Please contact System Administrator.'))
|
||||
if self.is_private:
|
||||
shutil.move(public_file_path, private_file_path)
|
||||
url_starts_with = "/private/files/"
|
||||
else:
|
||||
shutil.move(private_file_path, public_file_path)
|
||||
url_starts_with = "/files/"
|
||||
|
||||
self.file_url = "{0}{1}".format(url_starts_with, file_name)
|
||||
update_existing_file_docs(self)
|
||||
|
||||
if (
|
||||
not self.attached_to_doctype
|
||||
or not self.attached_to_name
|
||||
or not self.fetch_attached_to_field(old_file_url)
|
||||
):
|
||||
return
|
||||
|
||||
frappe.db.set_value(self.attached_to_doctype, self.attached_to_name,
|
||||
self.attached_to_field, self.file_url)
|
||||
|
||||
def fetch_attached_to_field(self, old_file_url):
|
||||
if self.attached_to_field:
|
||||
return True
|
||||
|
||||
reference_dict = frappe.get_doc(
|
||||
self.attached_to_doctype, self.attached_to_name).as_dict()
|
||||
|
||||
for key, value in reference_dict.items():
|
||||
if value == old_file_url:
|
||||
self.attached_to_field = key
|
||||
return True
|
||||
|
||||
def validate_attachment_limit(self):
|
||||
attachment_limit = 0
|
||||
|
|
@ -335,8 +372,13 @@ class File(Document):
|
|||
|
||||
def get_content(self):
|
||||
"""Returns [`file_name`, `content`] for given file name `fname`"""
|
||||
if self.is_folder:
|
||||
frappe.throw(_("Cannot get file contents of a Folder"))
|
||||
|
||||
if self.get('content'):
|
||||
return self.content
|
||||
|
||||
self.validate_url()
|
||||
file_path = self.get_full_path()
|
||||
|
||||
# read the file
|
||||
|
|
@ -423,23 +465,6 @@ class File(Document):
|
|||
else:
|
||||
raise Exception
|
||||
|
||||
|
||||
def validate_url(self, df=None):
|
||||
if self.file_url:
|
||||
if not self.file_url.startswith(("http://", "https://", "/files/", "/private/files/")):
|
||||
frappe.throw(_("URL must start with 'http://' or 'https://'"))
|
||||
return
|
||||
|
||||
if not self.file_url.startswith(("http://", "https://")):
|
||||
# local file
|
||||
root_files_path = get_files_path(is_private=self.is_private)
|
||||
if not os.path.commonpath([root_files_path]) == os.path.commonpath([root_files_path, self.get_full_path()]):
|
||||
# basically the file url is skewed to not point to /files/ or /private/files
|
||||
frappe.throw(_("{0} is not a valid file url").format(self.file_url))
|
||||
self.file_url = unquote(self.file_url)
|
||||
self.file_size = frappe.form_dict.file_size or self.file_size
|
||||
|
||||
|
||||
def get_uploaded_content(self):
|
||||
# should not be unicode when reading a file, hence using frappe.form
|
||||
if 'filedata' in frappe.form_dict:
|
||||
|
|
|
|||
|
|
@ -192,13 +192,10 @@ class TestSameContent(unittest.TestCase):
|
|||
|
||||
|
||||
class TestFile(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.delete_test_data()
|
||||
self.upload_file()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
frappe.get_doc("File", {"file_name": "file_copy.txt"}).delete()
|
||||
|
|
@ -352,6 +349,22 @@ class TestFile(unittest.TestCase):
|
|||
self.assertEqual(file1.file_url, file2.file_url)
|
||||
self.assertTrue(os.path.exists(file2.get_full_path()))
|
||||
|
||||
def test_parent_directory_validation_in_file_url(self):
|
||||
file1 = frappe.get_doc({
|
||||
"doctype": "File",
|
||||
"file_name": 'parent_dir.txt',
|
||||
"attached_to_doctype": "",
|
||||
"attached_to_name": "",
|
||||
"is_private": 1,
|
||||
"content": test_content1}).insert()
|
||||
|
||||
file1.file_url = '/private/files/../test.txt'
|
||||
self.assertRaises(frappe.exceptions.ValidationError, file1.save)
|
||||
|
||||
# No validation to see if file exists
|
||||
file1.reload()
|
||||
file1.file_url = '/private/files/parent_dir2.txt'
|
||||
file1.save()
|
||||
|
||||
class TestAttachment(unittest.TestCase):
|
||||
test_doctype = 'Test For Attachment'
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue