Merge pull request #13996 from surajshetty3416/core-test-coverage-1

This commit is contained in:
Suraj Shetty 2021-08-24 11:47:47 +05:30 committed by GitHub
commit 5bf5e70991
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 453 additions and 530 deletions

View file

@ -29,4 +29,5 @@ def make_access_log(doctype=None, document=None, method=None, file_type=None,
doc.insert(ignore_permissions=True)
# `frappe.db.commit` added because insert doesnt `commit` when called in GET requests like `printview`
frappe.db.commit()
if frappe.request and frappe.request.method == 'GET':
frappe.db.commit()

View file

@ -21,11 +21,11 @@ import zipfile
import requests
import requests.exceptions
from PIL import Image, ImageFile, ImageOps
from io import StringIO
from io import BytesIO
from urllib.parse import quote, unquote
import frappe
from frappe import _, conf
from frappe import _, conf, safe_decode
from frappe.model.document import Document
from frappe.utils import call_hook_method, cint, cstr, encode, get_files_path, get_hook_method, random_string, strip
from frappe.utils.image import strip_exif_data, optimize_image
@ -257,8 +257,7 @@ class File(Document):
with open(get_files_path(file_name, is_private=self.is_private), "rb") as f:
self.content_hash = get_content_hash(f.read())
except IOError:
frappe.msgprint(_("File {0} does not exist").format(self.file_url))
raise
frappe.throw(_("File {0} does not exist").format(self.file_url))
def on_trash(self):
if self.is_home_folder or self.is_attachments_folder:
@ -270,16 +269,12 @@ class File(Document):
def make_thumbnail(self, set_as_thumbnail=True, width=300, height=300, suffix="small", crop=False):
if self.file_url:
if self.file_url.startswith("/files"):
try:
try:
if self.file_url.startswith(("/files", "/private/files")):
image, filename, extn = get_local_image(self.file_url)
except IOError:
return
else:
try:
else:
image, filename, extn = get_web_image(self.file_url)
except (requests.exceptions.HTTPError, requests.exceptions.SSLError, IOError, TypeError):
except (requests.exceptions.HTTPError, requests.exceptions.SSLError, IOError, TypeError):
return
size = width, height
@ -289,16 +284,13 @@ class File(Document):
image.thumbnail(size, Image.ANTIALIAS)
thumbnail_url = filename + "_" + suffix + "." + extn
path = os.path.abspath(frappe.get_site_path("public", thumbnail_url.lstrip("/")))
try:
image.save(path)
if set_as_thumbnail:
self.db_set("thumbnail_url", thumbnail_url)
self.db_set("thumbnail_url", thumbnail_url)
except IOError:
frappe.msgprint(_("Unable to write file format for {0}").format(path))
return
@ -326,12 +318,10 @@ class File(Document):
def unzip(self):
'''Unzip current file and replace it by its children'''
if not ".zip" in self.file_name:
frappe.msgprint(_("Not a zip file"))
return
if not self.file_url.endswith(".zip"):
frappe.throw(_("{0} is not a zip file").format(self.file_name))
zip_path = frappe.get_site_path(self.file_url.strip('/'))
base_url = os.path.dirname(self.file_url)
zip_path = self.get_full_path()
files = []
with zipfile.ZipFile(zip_path) as z:
@ -359,10 +349,6 @@ class File(Document):
return files
def get_file_url(self):
data = frappe.db.get_value("File", self.file_data_name, ["file_name", "file_url"], as_dict=True)
return data.file_url or data.file_name
def exists_on_disk(self):
exists = os.path.exists(self.get_full_path())
return exists
@ -431,47 +417,6 @@ class File(Document):
return get_files_path(self.file_name, is_private=self.is_private)
def get_file_doc(self):
'''returns File object (Document) from given parameters or form_dict'''
r = frappe.form_dict
if self.file_url is None: self.file_url = r.file_url
if self.file_name is None: self.file_name = r.file_name
if self.attached_to_doctype is None: self.attached_to_doctype = r.doctype
if self.attached_to_name is None: self.attached_to_name = r.docname
if self.attached_to_field is None: self.attached_to_field = r.docfield
if self.folder is None: self.folder = r.folder
if self.is_private is None: self.is_private = r.is_private
if r.filedata:
file_doc = self.save_uploaded()
elif r.file_url:
file_doc = self.save()
return file_doc
def save_uploaded(self):
self.content = self.get_uploaded_content()
if self.content:
return self.save()
else:
raise Exception
def get_uploaded_content(self):
# should not be unicode when reading a file, hence using frappe.form
if 'filedata' in frappe.form_dict:
if "," in frappe.form_dict.filedata:
frappe.form_dict.filedata = frappe.form_dict.filedata.rsplit(",", 1)[1]
frappe.uploaded_content = base64.b64decode(frappe.form_dict.filedata)
return frappe.uploaded_content
elif self.content:
return self.content
frappe.msgprint(_('No file attached'))
return None
def save_file(self, content=None, decode=False, ignore_existing_file_check=False):
file_exists = False
self.content = content
@ -539,14 +484,6 @@ class File(Document):
'file_url': self.file_url
}
def get_file_data_from_hash(self):
for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s",
(self.content_hash, self.is_private)):
b = frappe.get_doc('File', name)
return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']}
return False
def check_max_file_size(self):
max_file_size = get_max_file_size()
file_size = len(self.content)
@ -621,7 +558,8 @@ def create_new_folder(file_name, folder):
file.file_name = file_name
file.is_folder = 1
file.folder = folder
file.insert()
file.insert(ignore_if_duplicate=True)
return file
@frappe.whitelist()
def move_file(file_list, new_parent, old_parent):
@ -672,7 +610,7 @@ def get_local_image(file_url):
try:
image = Image.open(file_path)
except IOError:
frappe.msgprint(_("Unable to read file format for {0}").format(file_url), raise_exception=True)
frappe.throw(_("Unable to read file format for {0}").format(file_url))
content = None
@ -704,7 +642,7 @@ def get_web_image(file_url):
raise
try:
image = Image.open(StringIO(frappe.safe_decode(r.content)))
image = Image.open(BytesIO(r.content))
except Exception as e:
frappe.msgprint(_("Image link '{0}' is not valid").format(file_url), raise_exception=e)
@ -740,48 +678,12 @@ def delete_file(path):
os.remove(path)
def remove_file(fid=None, attached_to_doctype=None, attached_to_name=None, from_delete=False, delete_permanently=False):
"""Remove file and File entry"""
file_name = None
if not (attached_to_doctype and attached_to_name):
attached = frappe.db.get_value("File", fid,
["attached_to_doctype", "attached_to_name", "file_name"])
if attached:
attached_to_doctype, attached_to_name, file_name = attached
ignore_permissions, comment = False, None
if attached_to_doctype and attached_to_name and not from_delete:
doc = frappe.get_doc(attached_to_doctype, attached_to_name)
ignore_permissions = doc.has_permission("write") or False
if frappe.flags.in_web_form:
ignore_permissions = True
if not file_name:
file_name = frappe.db.get_value("File", fid, "file_name")
comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions, delete_permanently=delete_permanently)
return comment
def get_max_file_size():
return cint(conf.get('max_file_size')) or 10485760
def remove_all(dt, dn, from_delete=False, delete_permanently=False):
"""remove all files in a transaction"""
try:
for fid in frappe.db.sql_list("""select name from `tabFile` where
attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)):
if from_delete:
# If deleting a doc, directly delete files
frappe.delete_doc("File", fid, ignore_permissions=True, delete_permanently=delete_permanently)
else:
# Removes file and adds a comment in the document it is attached to
remove_file(fid=fid, attached_to_doctype=dt, attached_to_name=dn,
from_delete=from_delete, delete_permanently=delete_permanently)
except Exception as e:
if e.args[0]!=1054: raise # (temp till for patched)
def has_permission(doc, ptype=None, user=None):
has_access = False
@ -886,15 +788,13 @@ def extract_images_from_html(doc, content):
if b"," in content:
content = content.split(b",")[1]
content = base64.b64decode(content)
content = optimize_image(content, mtype)
if "filename=" in headers:
filename = headers.split("filename=")[-1]
filename = safe_decode(filename).split(";")[0]
# decode filename
if not isinstance(filename, str):
filename = str(filename, 'utf-8')
else:
filename = get_random_filename(content_type=mtype)
@ -922,12 +822,9 @@ def extract_images_from_html(doc, content):
return content
def get_random_filename(extn=None, content_type=None):
if extn:
if not extn.startswith("."):
extn = "." + extn
elif content_type:
def get_random_filename(content_type=None):
extn = None
if content_type:
extn = mimetypes.guess_extension(content_type)
return random_string(7) + (extn or "")
@ -938,7 +835,7 @@ def unzip_file(name):
'''Unzip the given file and make file records for each of the extracted files'''
file_obj = frappe.get_doc('File', name)
files = file_obj.unzip()
return len(files)
return files
@frappe.whitelist()
def optimize_saved_image(doc_name):
@ -979,13 +876,6 @@ def get_attached_images(doctype, names):
return out
@frappe.whitelist()
def validate_filename(filename):
from frappe.utils import now_datetime
timestamp = now_datetime().strftime(" %Y-%m-%d %H:%M:%S")
fname = get_file_name(filename, timestamp)
return fname
@frappe.whitelist()
def get_files_in_folder(folder, start=0, page_length=20):
start = cint(start)

View file

@ -2,11 +2,12 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
import base64
import json
import frappe
import os
import unittest
from frappe import _
from frappe.core.doctype.file.file import move_file, get_files_in_folder
from frappe.core.doctype.file.file import get_attached_images, move_file, get_files_in_folder, unzip_file
from frappe.utils import get_files_path
# test_records = frappe.get_test_records('File')
@ -365,6 +366,80 @@ class TestFile(unittest.TestCase):
file1.file_url = '/private/files/parent_dir2.txt'
file1.save()
def test_file_url_validation(self):
test_file = frappe.get_doc({
"doctype": "File",
"file_name": 'logo',
"file_url": 'https://frappe.io/files/frappe.png'
})
self.assertIsNone(test_file.validate())
# bad path
test_file.file_url = "/usr/bin/man"
self.assertRaisesRegex(frappe.exceptions.ValidationError, "URL must start with http:// or https://", test_file.validate)
test_file.file_url = None
test_file.file_name = "/usr/bin/man"
self.assertRaisesRegex(frappe.exceptions.ValidationError, "There is some problem with the file url", test_file.validate)
test_file.file_url = None
test_file.file_name = "_file"
self.assertRaisesRegex(IOError, "does not exist", test_file.validate)
test_file.file_url = None
test_file.file_name = "/private/files/_file"
self.assertRaisesRegex(IOError, "does not exist", test_file.validate)
def test_make_thumbnail(self):
# test web image
test_file = frappe.get_doc({
"doctype": "File",
"file_name": 'logo',
"file_url": frappe.utils.get_url('/_test/assets/image.jpg'),
}).insert(ignore_permissions=True)
test_file.make_thumbnail()
self.assertEquals(test_file.thumbnail_url, '/files/image_small.jpg')
# test local image
test_file.db_set('thumbnail_url', None)
test_file.reload()
test_file.file_url = "/files/image_small.jpg"
test_file.make_thumbnail(suffix="xs", crop=True)
self.assertEquals(test_file.thumbnail_url, '/files/image_small_xs.jpg')
frappe.clear_messages()
test_file.db_set('thumbnail_url', None)
test_file.reload()
test_file.file_url = frappe.utils.get_url('unknown.jpg')
test_file.make_thumbnail(suffix="xs")
self.assertEqual(json.loads(frappe.message_log[0]), {"message": f"File '{frappe.utils.get_url('unknown.jpg')}' not found"})
self.assertEquals(test_file.thumbnail_url, None)
def test_file_unzip(self):
file_path = frappe.get_app_path('frappe', 'www/_test/assets/file.zip')
public_file_path = frappe.get_site_path('public', 'files')
try:
import shutil
shutil.copy(file_path, public_file_path)
except Exception:
pass
test_file = frappe.get_doc({
"doctype": "File",
"file_url": '/files/file.zip',
}).insert(ignore_permissions=True)
self.assertListEqual([file.file_name for file in unzip_file(test_file.name)],
['css_asset.css', 'image.jpg', 'js_asset.min.js'])
test_file = frappe.get_doc({
"doctype": "File",
"file_url": frappe.utils.get_url('/_test/assets/image.jpg'),
}).insert(ignore_permissions=True)
self.assertRaisesRegex(frappe.exceptions.ValidationError, 'not a zip file', test_file.unzip)
class TestAttachment(unittest.TestCase):
test_doctype = 'Test For Attachment'
@ -469,3 +544,28 @@ class TestAttachmentsAccess(unittest.TestCase):
frappe.set_user('Administrator')
frappe.db.rollback()
class TestFileUtils(unittest.TestCase):
def test_extract_images_from_doc(self):
# with filename in data URI
todo = frappe.get_doc({
"doctype": "ToDo",
"description": 'Test <img src="data:image/png;filename=pix.png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII=">'
}).insert()
self.assertTrue(frappe.db.exists("File", {"attached_to_name": todo.name}))
self.assertIn('<img src="/files/pix.png">', todo.description)
self.assertListEqual(get_attached_images('ToDo', [todo.name])[todo.name], ['/files/pix.png'])
# without filename in data URI
todo = frappe.get_doc({
"doctype": "ToDo",
"description": 'Test <img src="data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII=">'
}).insert()
filename = frappe.db.exists("File", {"attached_to_name": todo.name})
self.assertIn(f'<img src="{frappe.get_doc("File", filename).file_url}', todo.description)
def test_create_new_folder(self):
from frappe.core.doctype.file.file import create_new_folder
folder = create_new_folder('test_folder', 'Home')
self.assertTrue(folder.is_folder)

View file

@ -11,8 +11,6 @@ from frappe.desk.query_report import generate_report_result
from frappe.model.document import Document
from frappe.utils import gzip_compress, gzip_decompress
from frappe.utils.background_jobs import enqueue
from frappe.core.doctype.file.file import remove_all
class PreparedReport(Document):
def before_insert(self):

View file

@ -3,6 +3,7 @@
# For license information, please see license.txt
from frappe.model.document import Document
import frappe
class RoleProfile(Document):
def autoname(self):
@ -11,5 +12,9 @@ class RoleProfile(Document):
def on_update(self):
""" Changes in role_profile reflected across all its user """
from frappe.core.doctype.user.user import update_roles
update_roles(self.name)
users = frappe.get_all('User', filters={'role_profile_name': self.name})
roles = [role.role for role in self.roles]
for d in users:
user = frappe.get_doc('User', d)
user.set('roles', [])
user.add_roles(*roles)

View file

@ -8,6 +8,7 @@ test_dependencies = ['Role']
class TestRoleProfile(unittest.TestCase):
def test_make_new_role_profile(self):
frappe.delete_doc_if_exists('Role Profile', 'Test 1', force=1)
new_role_profile = frappe.get_doc(dict(doctype='Role Profile', role_profile='Test 1')).insert()
self.assertEqual(new_role_profile.role_profile, 'Test 1')
@ -19,7 +20,25 @@ class TestRoleProfile(unittest.TestCase):
new_role_profile.save()
self.assertEqual(new_role_profile.roles[0].role, '_Test Role 2')
# user with a role profile
random_user = frappe.mock("email")
random_user_name = frappe.mock("name")
random_user = frappe.get_doc({
"doctype": "User",
"email": random_user,
"enabled": 1,
"first_name": random_user_name,
"new_password": "Eastern_43A1W",
"role_profile_name": 'Test 1'
}).insert(ignore_permissions=True, ignore_if_duplicate=True)
self.assertListEqual([role.role for role in random_user.roles], [role.role for role in new_role_profile.roles])
# clear roles
new_role_profile.roles = []
new_role_profile.save()
self.assertEqual(new_role_profile.roles, [])
# user roles with the role profile should also be updated
random_user.reload()
self.assertListEqual(random_user.roles, [])

View file

@ -70,5 +70,19 @@
"role": "System Manager"
}
]
}
},
{
"doctype": "User",
"email": "testpassword@example.com",
"enabled": 1,
"first_name": "_Test",
"new_password": "Eastern_43A1W",
"roles": [
{
"doctype": "Has Role",
"parentfield": "roles",
"role": "System Manager"
}
]
}
]

View file

@ -1,16 +1,18 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
import frappe, unittest, uuid
import json
import unittest
from unittest.mock import patch
from frappe.model.delete_doc import delete_doc
from frappe.utils.data import today, add_to_date
from frappe import _dict
from frappe.utils import get_url
from frappe.core.doctype.user.user import get_total_users
from frappe.core.doctype.user.user import MaxUsersReachedError, test_password_strength
from frappe.core.doctype.user.user import extract_mentions
import frappe
import frappe.exceptions
from frappe.core.doctype.user.user import (extract_mentions, reset_password,
sign_up, test_password_strength, update_password, verify_password)
from frappe.frappeclient import FrappeClient
from frappe.model.delete_doc import delete_doc
from frappe.utils import get_url
user_module = frappe.core.doctype.user.user
test_records = frappe.get_test_records('User')
class TestUser(unittest.TestCase):
@ -23,7 +25,7 @@ class TestUser(unittest.TestCase):
def test_user_type(self):
new_user = frappe.get_doc(dict(doctype='User', email='test-for-type@example.com',
first_name='Tester')).insert()
first_name='Tester')).insert(ignore_if_duplicate=True)
self.assertEqual(new_user.user_type, 'Website User')
# social login userid for frappe
@ -119,40 +121,9 @@ class TestUser(unittest.TestCase):
# system manager now added by Administrator
self.assertTrue("System Manager" in [d.role for d in me.get("roles")])
# def test_deny_multiple_sessions(self):
# from frappe.installer import update_site_config
# clear_limit('users')
#
# # allow one session
# user = frappe.get_doc('User', 'test@example.com')
# user.simultaneous_sessions = 1
# user.new_password = 'Eastern_43A1W'
# user.save()
#
# def test_request(conn):
# value = conn.get_value('User', 'first_name', {'name': 'test@example.com'})
# self.assertTrue('first_name' in value)
#
# from frappe.frappeclient import FrappeClient
# update_site_config('deny_multiple_sessions', 0)
#
# conn1 = FrappeClient(get_url(), "test@example.com", "Eastern_43A1W", verify=False)
# test_request(conn1)
#
# conn2 = FrappeClient(get_url(), "test@example.com", "Eastern_43A1W", verify=False)
# test_request(conn2)
#
# update_site_config('deny_multiple_sessions', 1)
# conn3 = FrappeClient(get_url(), "test@example.com", "Eastern_43A1W", verify=False)
# test_request(conn3)
#
# # first connection should fail
# test_request(conn1)
def test_delete_user(self):
new_user = frappe.get_doc(dict(doctype='User', email='test-for-delete@example.com',
first_name='Tester Delete User')).insert()
first_name='Tester Delete User')).insert(ignore_if_duplicate=True)
self.assertEqual(new_user.user_type, 'Website User')
# role with desk access
@ -174,7 +145,7 @@ class TestUser(unittest.TestCase):
self.assertFalse(frappe.db.exists('User', new_user.name))
def test_password_strength(self):
# Test Password without Password Strenth Policy
# Test Password without Password Strength Policy
frappe.db.set_value("System Settings", "System Settings", "enable_password_policy", 0)
# password policy is disabled, test_password_strength should be ignored
@ -193,6 +164,17 @@ class TestUser(unittest.TestCase):
result = test_password_strength("Eastern_43A1W")
self.assertEqual(result['feedback']['password_policy_validation_passed'], True)
# test password strength while saving user with new password
user = frappe.get_doc("User", "test@example.com")
frappe.flags.in_test = False
user.new_password = "password"
self.assertRaisesRegex(frappe.exceptions.ValidationError, "Invalid Password", user.save)
user.reload()
user.new_password = "Eastern_43A1W"
user.save()
frappe.flags.in_test = True
def test_comment_mentions(self):
comment = '''
<span class="mention" data-id="test.comment@example.com" data-value="Test" data-denotation-char="@">
@ -227,6 +209,7 @@ class TestUser(unittest.TestCase):
self.assertEqual(extract_mentions(comment)[0], "test_user@example.com")
self.assertEqual(extract_mentions(comment)[1], "test.again@example1.com")
frappe.delete_doc("User Group", "Team")
doc = frappe.get_doc({
'doctype': 'User Group',
'name': 'Team',
@ -236,14 +219,18 @@ class TestUser(unittest.TestCase):
'user': 'test1@example.com'
}]
})
doc.insert(ignore_if_duplicate=True)
doc.insert()
comment = '''
<div>
Testing comment for
<span class="mention" data-id="Team" data-value="Team" data-is-group="true" data-denotation-char="@">
<span><span class="ql-mention-denotation-char">@</span>Team</span>
</span>
</span> and
<span class="mention" data-id="Unknown Team" data-value="Unknown Team" data-is-group="true" data-denotation-char="@">
<span><span class="ql-mention-denotation-char">@</span>Unknown Team</span>
</span><!-- this should be ignored-->
please check
</div>
'''
@ -267,31 +254,124 @@ class TestUser(unittest.TestCase):
self.assertEqual(res1.status_code, 200)
self.assertEqual(res2.status_code, 417)
# def test_user_rollback(self):
# """
# FIXME: This is failing with PR #12693 as Rollback can't happen if notifications sent on user creation.
# Make sure that notifications disabled.
# """
# frappe.db.commit()
# frappe.db.begin()
# user_id = str(uuid.uuid4())
# email = f'{user_id}@example.com'
# try:
# frappe.flags.in_import = True # disable throttling
# frappe.get_doc(dict(
# doctype='User',
# email=email,
# first_name=user_id,
# )).insert()
# finally:
# frappe.flags.in_import = False
def test_user_rename(self):
old_name = "test_user_rename@example.com"
new_name = "test_user_rename_new@example.com"
user = frappe.get_doc({
"doctype": "User",
"email": old_name,
"enabled": 1,
"first_name": "_Test",
"new_password": "Eastern_43A1W",
"roles": [
{
"doctype": "Has Role",
"parentfield": "roles",
"role": "System Manager"
}]
}).insert(ignore_permissions=True, ignore_if_duplicate=True)
# # Check user has been added
# self.assertIsNotNone(frappe.db.get("User", {"email": email}))
frappe.rename_doc('User', user.name, new_name)
self.assertTrue(frappe.db.exists("Notification Settings", new_name))
frappe.delete_doc("User", new_name)
def test_signup(self):
import frappe.website.utils
random_user = frappe.mock('email')
random_user_name = frappe.mock('name')
# disabled signup
with patch.object(user_module, "is_signup_disabled", return_value=True):
self.assertRaisesRegex(frappe.exceptions.ValidationError, "Sign Up is disabled",
sign_up, random_user, random_user_name, "/signup")
self.assertTupleEqual(sign_up(random_user, random_user_name, "/welcome"), (1, "Please check your email for verification"))
self.assertEqual(frappe.cache().hget('redirect_after_login', random_user), "/welcome")
# re-register
self.assertTupleEqual(sign_up(random_user, random_user_name, "/welcome"), (0, "Already Registered"))
# disabled user
user = frappe.get_doc("User", random_user)
user.enabled = 0
user.save()
self.assertTupleEqual(sign_up(random_user, random_user_name, "/welcome"), (0, "Registered but disabled"))
# throttle user creation
with patch.object(user_module.frappe.db, "get_creation_count", return_value=301):
self.assertRaisesRegex(frappe.exceptions.ValidationError, "Throttled",
sign_up, frappe.mock('email'), random_user_name, "/signup")
def test_reset_password(self):
from frappe.auth import CookieManager, LoginManager
from frappe.utils import set_request
old_password = "Eastern_43A1W"
new_password = "easy_password"
set_request(path="/random")
frappe.local.cookie_manager = CookieManager()
frappe.local.login_manager = LoginManager()
frappe.set_user("testpassword@example.com")
test_user = frappe.get_doc("User", "testpassword@example.com")
test_user.reset_password()
self.assertEqual(update_password(new_password, key=test_user.reset_password_key), "/app")
self.assertEqual(update_password(new_password, key="wrong_key"), "The Link specified has either been used before or Invalid")
# password verification should fail with old password
self.assertRaises(frappe.exceptions.AuthenticationError, verify_password, old_password)
verify_password(new_password)
# reset password
update_password(old_password, old_password=new_password)
self.assertRaisesRegex(frappe.exceptions.ValidationError, "Invalid key type", update_password, "test", 1, ['like', '%'])
password_strength_response = {
"feedback": {
"password_policy_validation_passed": False,
"suggestions": ["Fix password"]
}
}
# password strength failure test
with patch.object(user_module, "test_password_strength", return_value=password_strength_response):
self.assertRaisesRegex(frappe.exceptions.ValidationError, "Fix password", update_password, new_password, 0, test_user.reset_password_key)
# test redirect URL for website users
frappe.set_user("test2@example.com")
self.assertEqual(update_password(new_password, old_password=old_password), "/")
# reset password
update_password(old_password, old_password=new_password)
# test API endpoint
with patch.object(user_module.frappe, 'sendmail') as sendmail:
frappe.clear_messages()
test_user = frappe.get_doc("User", "test2@example.com")
self.assertEqual(reset_password(user="test2@example.com"), None)
test_user.reload()
self.assertEqual(update_password(new_password, key=test_user.reset_password_key), "/")
update_password(old_password, old_password=new_password)
self.assertEqual(json.loads(frappe.message_log[0]), {"message": "Password reset instructions have been sent to your email"})
sendmail.assert_called_once()
self.assertEqual(sendmail.call_args[1]["recipients"], "test2@example.com")
self.assertEqual(reset_password(user="test2@example.com"), None)
self.assertEqual(reset_password(user="Administrator"), "not allowed")
self.assertEqual(reset_password(user="random"), "not found")
def test_user_onload_modules(self):
from frappe.config import get_modules_from_all_apps
from frappe.desk.form.load import getdoc
frappe.response.docs = []
getdoc("User", "Administrator")
doc = frappe.response.docs[0]
self.assertListEqual(doc.get("__onload").get('all_modules', []),
[m.get("module_name") for m in get_modules_from_all_apps()])
# # Check that rollback works
# frappe.db.rollback()
# self.assertIsNone(frappe.db.get("User", {"email": email}))
def delete_contact(user):
frappe.db.delete("Contact", {"email_id": user})

View file

@ -166,7 +166,7 @@ frappe.ui.form.on('User', {
frm.add_custom_button(__("Reset OTP Secret"), function() {
frappe.call({
method: "frappe.core.doctype.user.user.reset_otp_secret",
method: "frappe.twofactor.reset_otp_secret",
args: {
"user": frm.doc.name
}

View file

@ -15,17 +15,11 @@ from frappe.desk.doctype.notification_settings.notification_settings import crea
from frappe.utils.user import get_system_managers
from frappe.website.utils import is_signup_disabled
from frappe.rate_limiter import rate_limit
from frappe.utils.background_jobs import enqueue
from frappe.core.doctype.user_type.user_type import user_linked_with_permission_on_doctype
STANDARD_USERS = ("Guest", "Administrator")
class MaxUsersReachedError(frappe.ValidationError):
pass
class User(Document):
__new_password = None
@ -56,8 +50,6 @@ class User(Document):
frappe.cache().delete_key('enabled_users')
def validate(self):
self.check_demo()
# clear new password
self.__new_password = self.new_password
self.new_password = ""
@ -137,10 +129,6 @@ class User(Document):
"""Returns true if current user is the session user"""
return self.name == frappe.session.user
def check_demo(self):
if frappe.session.user == 'demo@erpnext.com':
frappe.throw(_('Cannot change user details in demo. Please signup for a new account at https://erpnext.com'), title=_('Not Allowed'))
def set_full_name(self):
self.full_name = " ".join(filter(None, [self.first_name, self.last_name]))
@ -398,7 +386,6 @@ class User(Document):
def before_rename(self, old_name, new_name, merge=False):
self.check_demo()
frappe.clear_cache(user=old_name)
self.validate_rename(old_name, new_name)
@ -718,85 +705,6 @@ def get_email_awaiting(user):
where parent = %(user)s""",{"user":user})
return False
@frappe.whitelist(allow_guest=False)
def set_email_password(email_account, user, password):
account = frappe.get_doc("Email Account", email_account)
if account.awaiting_password:
account.awaiting_password = 0
account.password = password
try:
account.save(ignore_permissions=True)
except Exception:
frappe.db.rollback()
return False
return True
def setup_user_email_inbox(email_account, awaiting_password, email_id, enable_outgoing):
""" setup email inbox for user """
def add_user_email(user):
user = frappe.get_doc("User", user)
row = user.append("user_emails", {})
row.email_id = email_id
row.email_account = email_account
row.awaiting_password = awaiting_password or 0
row.enable_outgoing = enable_outgoing or 0
user.save(ignore_permissions=True)
udpate_user_email_settings = False
if not all([email_account, email_id]):
return
user_names = frappe.db.get_values("User", { "email": email_id }, as_dict=True)
if not user_names:
return
for user in user_names:
user_name = user.get("name")
# check if inbox is alreay configured
user_inbox = frappe.db.get_value("User Email", {
"email_account": email_account,
"parent": user_name
}, ["name"]) or None
if not user_inbox:
add_user_email(user_name)
else:
# update awaiting password for email account
udpate_user_email_settings = True
if udpate_user_email_settings:
frappe.db.sql("""UPDATE `tabUser Email` SET awaiting_password = %(awaiting_password)s,
enable_outgoing = %(enable_outgoing)s WHERE email_account = %(email_account)s""", {
"email_account": email_account,
"enable_outgoing": enable_outgoing,
"awaiting_password": awaiting_password or 0
})
else:
users = " and ".join([frappe.bold(user.get("name")) for user in user_names])
frappe.msgprint(_("Enabled email inbox for user {0}").format(users))
ask_pass_update()
def remove_user_email_inbox(email_account):
""" remove user email inbox settings if email account is deleted """
if not email_account:
return
users = frappe.get_all("User Email", filters={
"email_account": email_account
}, fields=["parent as name"])
for user in users:
doc = frappe.get_doc("User", user.get("name"))
to_remove = [ row for row in doc.user_emails if row.email_account == email_account ]
[ doc.remove(row) for row in to_remove ]
doc.save(ignore_permissions=True)
def ask_pass_update():
# update the sys defaults as to awaiting users
from frappe.utils import set_default
@ -809,24 +717,19 @@ def ask_pass_update():
def _get_user_for_update_password(key, old_password):
# verify old password
result = frappe._dict()
if key:
user = frappe.db.get_value("User", {"reset_password_key": key})
if not user:
return {
'message': _("The Link specified has either been used before or Invalid")
}
result.user = frappe.db.get_value("User", {"reset_password_key": key})
if not result.user:
result.message = _("The Link specified has either been used before or Invalid")
elif old_password:
# verify old password
frappe.local.login_manager.check_password(frappe.session.user, old_password)
user = frappe.session.user
result.user = user
else:
return
return {
'user': user
}
return result
def reset_user_data(user):
user_doc = frappe.get_doc("User", user)
@ -848,14 +751,12 @@ def sign_up(email, full_name, redirect_to):
user = frappe.db.get("User", {"email": email})
if user:
if user.disabled:
return 0, _("Registered but disabled")
else:
if user.enabled:
return 0, _("Already Registered")
else:
return 0, _("Registered but disabled")
else:
if frappe.db.sql("""select count(*) from tabUser where
HOUR(TIMEDIFF(CURRENT_TIMESTAMP, TIMESTAMP(modified)))=1""")[0][0] > 300:
if frappe.db.get_creation_count('User', 60) > 300:
frappe.respond_as_web_page(_('Temporarily Disabled'),
_('Too many users signed up recently, so the registration is disabled. Please try back in an hour'),
http_status_code=429)
@ -1048,91 +949,6 @@ def update_gravatar(name):
if gravatar:
frappe.db.set_value('User', name, 'user_image', gravatar)
@frappe.whitelist(allow_guest=True)
def send_token_via_sms(tmp_id,phone_no=None,user=None):
try:
from frappe.core.doctype.sms_settings.sms_settings import send_request
except:
return False
if not frappe.cache().ttl(tmp_id + '_token'):
return False
ss = frappe.get_doc('SMS Settings', 'SMS Settings')
if not ss.sms_gateway_url:
return False
token = frappe.cache().get(tmp_id + '_token')
args = {ss.message_parameter: 'verification code is {}'.format(token)}
for d in ss.get("parameters"):
args[d.parameter] = d.value
if user:
user_phone = frappe.db.get_value('User', user, ['phone','mobile_no'], as_dict=1)
usr_phone = user_phone.mobile_no or user_phone.phone
if not usr_phone:
return False
else:
if phone_no:
usr_phone = phone_no
else:
return False
args[ss.receiver_parameter] = usr_phone
status = send_request(ss.sms_gateway_url, args, use_post=ss.use_post)
if 200 <= status < 300:
frappe.cache().delete(tmp_id + '_token')
return True
else:
return False
@frappe.whitelist(allow_guest=True)
def send_token_via_email(tmp_id,token=None):
import pyotp
user = frappe.cache().get(tmp_id + '_user')
count = token or frappe.cache().get(tmp_id + '_token')
if ((not user) or (user == 'None') or (not count)):
return False
user_email = frappe.db.get_value('User',user, 'email')
if not user_email:
return False
otpsecret = frappe.cache().get(tmp_id + '_otp_secret')
hotp = pyotp.HOTP(otpsecret)
frappe.sendmail(
recipients=user_email,
sender=None,
subject="Verification Code",
template="verification_code",
args=dict(code=hotp.at(int(count))),
delayed=False,
retry=3
)
return True
@frappe.whitelist(allow_guest=True)
def reset_otp_secret(user):
otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name')
user_email = frappe.db.get_value('User',user, 'email')
if frappe.session.user in ["Administrator", user] :
frappe.defaults.clear_default(user + '_otplogin')
frappe.defaults.clear_default(user + '_otpsecret')
email_args = {
'recipients':user_email, 'sender':None, 'subject':'OTP Secret Reset - {}'.format(otp_issuer or "Frappe Framework"),
'message':'<p>Your OTP secret on {} has been reset. If you did not perform this reset and did not request it, please contact your System Administrator immediately.</p>'.format(otp_issuer or "Frappe Framework"),
'delayed':False,
'retry':3
}
enqueue(method=frappe.sendmail, queue='short', timeout=300, event=None, is_async=True, job_name=None, now=False, **email_args)
return frappe.msgprint(_("OTP Secret has been reset. Re-registration will be required on next login."))
else:
return frappe.throw(_("OTP secret can only be reset by the Administrator."))
def throttle_user_creation():
if frappe.flags.in_import:
return
@ -1150,15 +966,6 @@ def get_module_profile(module_profile):
module_profile = frappe.get_doc('Module Profile', {'module_profile_name': module_profile})
return module_profile.get('block_modules')
def update_roles(role_profile):
users = frappe.get_all('User', filters={'role_profile_name': role_profile})
role_profile = frappe.get_doc('Role Profile', role_profile)
roles = [role.role for role in role_profile.roles]
for d in users:
user = frappe.get_doc('User', d)
user.set('roles', [])
user.add_roles(*roles)
def create_contact(user, ignore_links=False, ignore_mandatory=False):
from frappe.contacts.doctype.contact.contact import get_contact_name
if user.name in ["Administrator", "Guest"]: return
@ -1217,18 +1024,18 @@ def generate_keys(user):
:param user: str
"""
if "System Manager" in frappe.get_roles():
user_details = frappe.get_doc("User", user)
api_secret = frappe.generate_hash(length=15)
# if api key is not set generate api key
if not user_details.api_key:
api_key = frappe.generate_hash(length=15)
user_details.api_key = api_key
user_details.api_secret = api_secret
user_details.save()
frappe.only_for("System Manager")
user_details = frappe.get_doc("User", user)
api_secret = frappe.generate_hash(length=15)
# if api key is not set generate api key
if not user_details.api_key:
api_key = frappe.generate_hash(length=15)
user_details.api_key = api_key
user_details.api_secret = api_secret
user_details.save()
return {"api_secret": api_secret}
return {"api_secret": api_secret}
frappe.throw(frappe._("Not Permitted"), frappe.PermissionError)
@frappe.whitelist()
def switch_theme(theme):

View file

@ -2,6 +2,7 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# See license.txt
import frappe
from frappe.core.doctype.user.user import get_system_users
from frappe.desk.form.assign_to import add as assign_task
import unittest
@ -54,7 +55,4 @@ def get_todo():
return frappe.get_cached_doc('ToDo', res[0].name)
def get_user():
users = frappe.db.get_all('User',
filters={'name': ('not in', ['Administrator', 'Guest'])},
fields='name', limit=1)
return users[0].name
return get_system_users(limit=1)[0]

View file

@ -5,7 +5,7 @@ import frappe, json
import frappe.desk.form.meta
import frappe.desk.form.load
from frappe.desk.form.document_follow import follow_document
from frappe.utils.file_manager import extract_images_from_html
from frappe.core.doctype.file.file import extract_images_from_html
from frappe import _

View file

@ -137,8 +137,6 @@ class EmailAccount(Document):
def on_update(self):
"""Check there is only one default of each type."""
from frappe.core.doctype.user.user import setup_user_email_inbox
self.check_automatic_linking_email_account()
self.there_must_be_only_one_default()
setup_user_email_inbox(email_account=self.name, awaiting_password=self.awaiting_password,
@ -532,8 +530,6 @@ class EmailAccount(Document):
def on_trash(self):
"""Clear communications where email account is linked"""
from frappe.core.doctype.user.user import remove_user_email_inbox
frappe.db.sql("update `tabCommunication` set email_account='' where email_account=%s", self.name)
remove_user_email_inbox(email_account=self.name)
@ -724,3 +720,84 @@ def get_max_email_uid(email_account):
else:
max_uid = cint(result[0].get("uid", 0)) + 1
return max_uid
def setup_user_email_inbox(email_account, awaiting_password, email_id, enable_outgoing):
""" setup email inbox for user """
from frappe.core.doctype.user.user import ask_pass_update
def add_user_email(user):
user = frappe.get_doc("User", user)
row = user.append("user_emails", {})
row.email_id = email_id
row.email_account = email_account
row.awaiting_password = awaiting_password or 0
row.enable_outgoing = enable_outgoing or 0
user.save(ignore_permissions=True)
update_user_email_settings = False
if not all([email_account, email_id]):
return
user_names = frappe.db.get_values("User", {"email": email_id}, as_dict=True)
if not user_names:
return
for user in user_names:
user_name = user.get("name")
# check if inbox is alreay configured
user_inbox = frappe.db.get_value("User Email", {
"email_account": email_account,
"parent": user_name
}, ["name"]) or None
if not user_inbox:
add_user_email(user_name)
else:
# update awaiting password for email account
update_user_email_settings = True
if update_user_email_settings:
frappe.db.sql("""UPDATE `tabUser Email` SET awaiting_password = %(awaiting_password)s,
enable_outgoing = %(enable_outgoing)s WHERE email_account = %(email_account)s""", {
"email_account": email_account,
"enable_outgoing": enable_outgoing,
"awaiting_password": awaiting_password or 0
})
else:
users = " and ".join([frappe.bold(user.get("name")) for user in user_names])
frappe.msgprint(_("Enabled email inbox for user {0}").format(users))
ask_pass_update()
def remove_user_email_inbox(email_account):
""" remove user email inbox settings if email account is deleted """
if not email_account:
return
users = frappe.get_all("User Email", filters={
"email_account": email_account
}, fields=["parent as name"])
for user in users:
doc = frappe.get_doc("User", user.get("name"))
to_remove = [row for row in doc.user_emails if row.email_account == email_account]
[doc.remove(row) for row in to_remove]
doc.save(ignore_permissions=True)
@frappe.whitelist(allow_guest=False)
def set_email_password(email_account, user, password):
account = frappe.get_doc("Email Account", email_account)
if account.awaiting_password:
account.awaiting_password = 0
account.password = password
try:
account.save(ignore_permissions=True)
except Exception:
frappe.db.rollback()
return False
return True

View file

@ -10,7 +10,7 @@ import frappe.model.meta
from frappe import _
from frappe import get_module_path
from frappe.model.dynamic_links import get_dynamic_link_map
from frappe.core.doctype.file.file import remove_all
from frappe.utils.file_manager import remove_all
from frappe.utils.password import delete_all_passwords_for
from frappe.model.naming import revert_series_if_last
from frappe.utils.global_search import delete_for_document

View file

@ -230,7 +230,7 @@ frappe.Application = class Application {
s.fields_dict.checking.$wrapper.html('<i class="fa fa-spinner fa-spin fa-4x"></i>');
s.show();
frappe.call({
method: 'frappe.core.doctype.user.user.set_email_password',
method: 'frappe.email.doctype.email_account.email_account.set_email_password',
args: {
"email_account": email_account[i]["email_account"],
"user": user,

View file

@ -103,7 +103,7 @@ def rate_limit(key: str, limit: Union[int, Callable] = 5, seconds: int= 24*60*60
def wrapper(*args, **kwargs):
# Do not apply rate limits if method is not opted to check
if methods != 'ALL' and frappe.request.method.upper() not in methods:
return frappe.call(fun, **frappe.form_dict)
return frappe.call(fun, **frappe.form_dict or kwargs)
_limit = limit() if callable(limit) else limit
@ -118,6 +118,6 @@ def rate_limit(key: str, limit: Union[int, Callable] = 5, seconds: int= 24*60*60
if value > _limit:
frappe.throw(_("You hit the rate limit because of too many requests. Please try after sometime."))
return frappe.call(fun, **frappe.form_dict)
return frappe.call(fun, **frappe.form_dict or kwargs)
return wrapper
return ratelimit_decorator

View file

@ -398,3 +398,23 @@ def should_remove_barcode_image(barcode):
def disable():
frappe.db.set_value('System Settings', None, 'enable_two_factor_auth', 0)
@frappe.whitelist()
def reset_otp_secret(user):
otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name')
user_email = frappe.db.get_value('User', user, 'email')
if frappe.session.user in ["Administrator", user] :
frappe.defaults.clear_default(user + '_otplogin')
frappe.defaults.clear_default(user + '_otpsecret')
email_args = {
'recipients': user_email,
'sender': None,
'subject': _('OTP Secret Reset - {0}').format(otp_issuer or "Frappe Framework"),
'message': _('<p>Your OTP secret on {0} has been reset. If you did not perform this reset and did not request it, please contact your System Administrator immediately.</p>').format(otp_issuer or "Frappe Framework"),
'delayed':False,
'retry':3
}
enqueue(method=frappe.sendmail, queue='short', timeout=300, event=None, is_async=True, job_name=None, now=False, **email_args)
return frappe.msgprint(_("OTP Secret has been reset. Re-registration will be required on next login."))
else:
return frappe.throw(_("OTP secret can only be reset by the Administrator."))

View file

@ -6,16 +6,7 @@ import json
import csv
import requests
from io import StringIO
from frappe.utils import encode, cstr, cint, flt, comma_or
def read_csv_content_from_uploaded_file(ignore_encoding=False):
if getattr(frappe, "uploaded_file", None):
with open(frappe.uploaded_file, "r") as upfile:
fcontent = upfile.read()
else:
_file = frappe.new_doc("File")
fcontent = _file.get_uploaded_content()
return read_csv_content(fcontent, ignore_encoding)
from frappe.utils import cstr, cint, flt, comma_or
def read_csv_content_from_attached_file(doc):
fileid = frappe.get_all("File", fields = ["name"], filters = {"attached_to_doctype": doc.doctype,

View file

@ -213,28 +213,22 @@ def write_file(content, fname, is_private=0):
return get_files_path(fname, is_private=is_private)
def remove_all(dt, dn, from_delete=False):
def remove_all(dt, dn, from_delete=False, delete_permanently=False):
"""remove all files in a transaction"""
try:
for fid in frappe.db.sql_list("""select name from `tabFile` where
attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)):
remove_file(fid, dt, dn, from_delete)
if from_delete:
# If deleting a doc, directly delete files
frappe.delete_doc("File", fid, ignore_permissions=True, delete_permanently=delete_permanently)
else:
# Removes file and adds a comment in the document it is attached to
remove_file(fid=fid, attached_to_doctype=dt, attached_to_name=dn,
from_delete=from_delete, delete_permanently=delete_permanently)
except Exception as e:
if e.args[0]!=1054: raise # (temp till for patched)
def remove_file_by_url(file_url, doctype=None, name=None):
if doctype and name:
fid = frappe.db.get_value("File", {"file_url": file_url,
"attached_to_doctype": doctype, "attached_to_name": name})
else:
fid = frappe.db.get_value("File", {"file_url": file_url})
if fid:
return remove_file(fid)
def remove_file(fid, attached_to_doctype=None, attached_to_name=None, from_delete=False):
def remove_file(fid=None, attached_to_doctype=None, attached_to_name=None, from_delete=False, delete_permanently=False):
"""Remove file and File entry"""
file_name = None
if not (attached_to_doctype and attached_to_name):
@ -252,8 +246,7 @@ def remove_file(fid, attached_to_doctype=None, attached_to_name=None, from_delet
if not file_name:
file_name = frappe.db.get_value("File", fid, "file_name")
comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions)
frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions, delete_permanently=delete_permanently)
return comment
@ -372,76 +365,6 @@ def download_file(file_url):
frappe.local.response.filecontent = filedata
frappe.local.response.type = "download"
def extract_images_from_doc(doc, fieldname):
content = doc.get(fieldname)
content = extract_images_from_html(doc, content)
if frappe.flags.has_dataurl:
doc.set(fieldname, content)
def extract_images_from_html(doc, content):
frappe.flags.has_dataurl = False
def _save_file(match):
data = match.group(1)
data = data.split("data:")[1]
headers, content = data.split(",")
mtype = headers.split(";")[0]
if isinstance(content, str):
content = content.encode("utf-8")
if b"," in content:
content = content.split(b",")[1]
content = base64.b64decode(content)
content = optimize_image(content, mtype)
if "filename=" in headers:
filename = headers.split("filename=")[-1]
# decode filename
if not isinstance(filename, str):
filename = str(filename, 'utf-8')
else:
filename = get_random_filename(content_type=mtype)
doctype = doc.parenttype if doc.parent else doc.doctype
name = doc.parent or doc.name
if doc.doctype == "Comment":
doctype = doc.reference_doctype
name = doc.reference_name
# TODO fix this
file_url = save_file(filename, content, doctype, name, decode=False).get("file_url")
if not frappe.flags.has_dataurl:
frappe.flags.has_dataurl = True
return '<img src="{file_url}"'.format(file_url=file_url)
if content:
content = re.sub(r'<img[^>]*src\s*=\s*["\'](?=data:)(.*?)["\']', _save_file, content)
return content
def get_random_filename(extn=None, content_type=None):
if extn:
if not extn.startswith("."):
extn = "." + extn
elif content_type:
extn = mimetypes.guess_extension(content_type)
return random_string(7) + (extn or "")
@frappe.whitelist(allow_guest=True)
def validate_filename(filename):
from frappe.utils import now_datetime
timestamp = now_datetime().strftime(" %Y-%m-%d %H:%M:%S")
fname = get_file_name(filename, timestamp)
return fname
@frappe.whitelist()
def add_attachments(doctype, name, attachments):
'''Add attachments to the given DocType'''

View file

@ -28,7 +28,7 @@ class PersonalDataDownloadRequest(Document):
})
f.save(ignore_permissions=True)
file_link = frappe.utils.get_url("/api/method/frappe.core.doctype.file.file.download_file") +\
file_link = frappe.utils.get_url("/api/method/frappe.utils.file_manager.download_file") +\
"?" + get_signed_params({"file_url": f.file_url})
host_name = frappe.local.site
frappe.sendmail(

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB