[1/3] file-api: code migration

migrate api from file_manager.py to file.py

Signed-off-by: Chinmay Pai <chinmaydpai@gmail.com>
This commit is contained in:
Chinmay Pai 2018-08-31 01:55:15 +05:30
parent ff03d8d0fe
commit 16a99f5472
No known key found for this signature in database
GPG key ID: 75507BE256F40CED
11 changed files with 74 additions and 52 deletions

View file

@ -9,7 +9,6 @@ import frappe.utils
import json, os
from six import iteritems, string_types, integer_types
from frappe.utils.file_manager import save_file
'''
Handle RESTful requests that are mapped to the `/api/resource` route.
@ -351,7 +350,9 @@ def attach_file(filename=None, filedata=None, doctype=None, docname=None, folder
if not doc.has_permission():
frappe.throw(_("Not permitted"), frappe.PermissionError)
f = save_file(filename, filedata, doctype, docname, folder, decode_base64, is_private, docfield)
_file = frappe.get_doc("File", {"file_name": filename, "content": filedata,
"attached_to_doctype": doctype, "attached_to_name": docname})
f = _file.save_file(folder=folder, decode=decode_base64, is_private=is_private, df=docfield)
if docfield and doctype:
doc.set(docfield, f.file_url)

View file

@ -457,7 +457,6 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
if error_flag and data_import_doc.skip_errors and len(data) != len(data_rows_with_error):
import_status = "Partially Successful"
# write the file with the faulty row
from frappe.utils.file_manager import save_file
file_name = 'error_' + filename + file_extension
if file_extension == '.xlsx':
from frappe.utils.xlsxutils import make_xlsx
@ -466,8 +465,9 @@ def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False,
else:
from frappe.utils.csvutils import to_csv
file_data = to_csv(data_rows_with_error)
error_data_file = save_file(file_name, file_data, "Data Import",
data_import_doc.name, "Home/Attachments")
_file = frappe.get_doc("File", {"file_name": file_name, "content": file_data,
"attached_to_doctype": "Data Import", "attached_to_name": data_import_doc.name})
error_data_file = _file.save_file(folder="Home/Attachments")
data_import_doc.error_file = error_data_file.file_url
elif error_flag:

View file

@ -421,12 +421,11 @@ class File(NestedSet):
frappe.uploaded_content = base64.b64decode(frappe.form_dict.filedata)
frappe.uploaded_filename = frappe.form_dict.filename
return frappe.uploaded_filename, frappe.uploaded_content
else:
frappe.msgprint(_('No file attached'))
frappe.msgprint(_('No file attached'))
return None, None
def save_file(self, decode=False):
def save_file(self, folder=None, decode=False, is_private=0, df=None):
if decode:
if isinstance(self.content, text_type):
self.content = self.content.encode("utf-8")
@ -436,10 +435,11 @@ class File(NestedSet):
self.content = base64.b64decode(self.content)
file_size = check_max_file_size(self.content)
content_hash = get_content_hash(self.content)
self.content_hash = get_content_hash(self.content)
self.content_type = mimetypes.guess_type(self.fname)[0]
self.fname = self.get_file_name(content_hash[-6:])
file_data = get_file_data_from_hash(content_hash, is_private=self.is_private)
self.is_private = is_private
file_data = self.get_file_data_from_hash(is_private=self.is_private)
if not file_data:
call_hook_method("before_write_file", file_size=file_size)
@ -467,6 +467,15 @@ class File(NestedSet):
return f
def get_file_data_from_hash(self, is_private=0):
for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s",
(self.content_hash, is_private)):
b = frappe.get_doc('File', name)
return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']}
return False
def save_file_on_filesystem(self):
fpath = write_file(self.content, self.fname, self.is_private)
@ -625,12 +634,6 @@ def check_max_file_size(content):
return file_size
def get_file_data_from_hash(content_hash, is_private=0):
for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s", (content_hash, is_private)):
b = frappe.get_doc('File', name)
return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']}
return False
def write_file(content, fname, is_private=0):
"""write file to disk with a random name (to compare)"""
file_path = get_files_path(is_private=is_private)
@ -827,7 +830,9 @@ def extract_images_from_html(doc, content):
name = doc.parent or doc.name
# TODO fix this
file_url = save_file(filename, content, doctype, name, decode=True).get("file_url")
_file = frappe.get_doc("File", {"file_name": filename, "content": content,
"attached_to_doctype": doctype, "attached_to_name": name})
file_url = _file.save_file(decode=True).file_url
if not frappe.flags.has_dataurl:
frappe.flags.has_dataurl = True

View file

@ -5,7 +5,7 @@ from __future__ import unicode_literals
import frappe
import unittest
from frappe.utils.file_manager import save_file, get_files_path
from frappe.utils.file_manager import get_files_path
from frappe import _
from frappe.core.doctype.file.file import move_file
# test_records = frappe.get_test_records('File')
@ -27,8 +27,9 @@ class TestFile(unittest.TestCase):
frappe.delete_doc("File", f[0])
def upload_file(self):
self.saved_file = save_file('file_copy.txt', "Testing file copy example.",\
"", "", self.get_folder("Test Folder 1", "Home").name)
_file = frappe.get_doc("File", {"file_name": "file_copy.txt", "content": "Testing file copy example.",
"attached_to_name": "", "attached_to_doctype": ""})
self.saved_file = _file.save_file(folder=self.get_folder("Test Folder 1", "Home").name)
self.saved_filename = get_files_path(self.saved_file.file_name)
def get_folder(self, folder_name, parent_folder="Home"):
@ -61,8 +62,9 @@ class TestFile(unittest.TestCase):
def test_folder_copy(self):
folder = self.get_folder("Test Folder 2", "Home")
folder = self.get_folder("Test Folder 3", "Home/Test Folder 2")
self.saved_file = save_file('folder_copy.txt', "Testing folder copy example.", "", "", folder.name)
_file = frappe.get_doc("File", {"file_name": "folder_copy.txt", "content": "Testing folder copy example.",
"attached_to_name": "", "attached_to_doctype": ""})
self.saved_file = _file.save_file(folder=folder.name)
move_file([{"name": folder.name}], 'Home/Test Folder 1', folder.folder)
@ -91,7 +93,9 @@ class TestFile(unittest.TestCase):
self.assertEqual(frappe.db.get_value("File", _("Home/Test Folder 1"), "file_size"), 0)
folder = self.get_folder("Test Folder 3", "Home/Test Folder 1")
self.saved_file = save_file('folder_copy.txt', "Testing folder copy example.", "", "", folder.name)
_file = frappe.get_doc("File", {"file_name": "folder_copy.txt", "content": "Testing folder copy example.",
"attached_to_name": "", "attached_to_doctype": ""})
self.saved_file = _file.save_file(folder=folder.name)
folder = frappe.get_doc("File", "Home/Test Folder 1/Test Folder 3")
self.assertRaises(frappe.ValidationError, folder.delete)
@ -114,8 +118,11 @@ class TestFile(unittest.TestCase):
# Rebuild the frappe.local.conf to take up the changes from site_config
frappe.local.conf = _dict(frappe.get_site_config())
self.assertRaises(MaxFileSizeReachedError, save_file, '_test_max_space.txt',
'This files test for max space usage', "", "", self.get_folder("Test Folder 2", "Home").name)
_file = frappe.get_doc("File", {"file_name": "_test_max_space.txt",
"content": "This file tests for max space usage",
"attached_to_name": "", "attached_to_doctype": ""})
self.assertRaises(MaxFileSizeReachedError,
_file.save_file(self.get_folder("Test Folder 2", "Home").name))
# Scrub the site_config and rebuild frappe.local.conf
clear_limit("space")

View file

@ -11,7 +11,7 @@ import frappe
from frappe.model.document import Document
from frappe.utils.background_jobs import enqueue
from frappe.desk.query_report import generate_report_result, get_columns_dict
from frappe.utils.file_manager import save_file, remove_all
from frappe.utils.file_manager import remove_all
from frappe.utils.csvutils import to_csv, read_csv_content_from_attached_file
from frappe.desk.form.load import get_attachments
from frappe.utils.file_manager import download_file
@ -62,14 +62,9 @@ def create_csv_file(columns, data, dt, dn):
rows = [tuple(columns)] + data
encoded = base64.b64encode(frappe.safe_encode(to_csv(rows)))
# Call save_file function to upload and attach the file
save_file(
fname=csv_filename,
content=encoded,
dt=dt,
dn=dn,
folder=None,
decode=True,
is_private=False)
_file = frappe.get_doc("File", {"file_name": csv_filename, "content": encoded,
"attached_to_doctype": dt, "attached_to_name": dn})
_file.save_file(decode=True)
@frappe.whitelist()

View file

@ -7,7 +7,6 @@ import frappe, json, os
from frappe.utils import strip, cint
from frappe.translate import (set_default_language, get_dict, send_translations)
from frappe.geo.country_info import get_country_info
from frappe.utils.file_manager import save_file
from frappe.utils.password import update_password
from werkzeug.useragents import UserAgent
from . import install_fixtures
@ -187,7 +186,9 @@ def update_user_name(args):
attach_user = args.get("attach_user").split(",")
if len(attach_user)==3:
filename, filetype, content = attach_user
fileurl = save_file(filename, content, "User", args.get("name"), decode=True).file_url
_file = frappe.get_doc("File", {"file_name": filename, "content": content,
"attached_to_doctype": "User", "attached_to_doctype": args.get("name")})
fileurl = _file.save_file(decode=True).file_url
frappe.db.set_value("User", args.get("name"), "user_image", fileurl)
if args.get('name'):

View file

@ -13,7 +13,7 @@ from frappe import _, safe_decode, safe_encode
from frappe.utils import (extract_email_id, convert_utc_to_user_timezone, now,
cint, cstr, strip, markdown, parse_addr)
from frappe.utils.scheduler import log
from frappe.utils.file_manager import get_random_filename, save_file, MaxFileSizeReachedError
from frappe.utils.file_manager import get_random_filename, MaxFileSizeReachedError
class EmailSizeExceededError(frappe.ValidationError): pass
class EmailTimeoutError(frappe.ValidationError): pass
@ -523,8 +523,10 @@ class Email:
for attachment in self.attachments:
try:
file_data = save_file(attachment['fname'], attachment['fcontent'],
doc.doctype, doc.name, is_private=1)
_file = frappe.get_doc("File", {"file_name": attachment['fname'],
"content": atachment['fcontent'], "attached_to_doctype": doc.doctype,
"attached_to_name": doc.name})
file_data = _file.save_file(is_private=1)
saved_attachments.append(file_data)
if attachment['fname'] in self.cid_map:

View file

@ -6,7 +6,7 @@ import frappe
import os
import unittest
from frappe.utils.file_manager import save_file, get_file, get_files_path
from frappe.utils.file_manager import get_file, get_files_path
test_content1 = 'Hello'
test_content2 = 'Hello World'
@ -22,7 +22,9 @@ class TestSimpleFile(unittest.TestCase):
def setUp(self):
self.attached_to_doctype, self.attached_to_docname = make_test_doc()
self.test_content = test_content1
self.saved_file = save_file('hello.txt', self.test_content, self.attached_to_doctype, self.attached_to_docname)
_file = frappe.get_doc("File", {"file_name": "hello.txt", "content": self.test_content,
"attached_to_doctype": self.attached_to_doctype, "attached_to_docname": self.attached_to_docname})
self.saved_file = _file.save_file()
self.saved_filename = get_files_path(self.saved_file.file_name)
def test_save(self):
@ -40,8 +42,12 @@ class TestSameFileName(unittest.TestCase):
self.attached_to_doctype, self.attached_to_docname = make_test_doc()
self.test_content1 = test_content1
self.test_content2 = test_content2
self.saved_file1 = save_file('hello.txt', self.test_content1, self.attached_to_doctype, self.attached_to_docname)
self.saved_file2 = save_file('hello.txt', self.test_content2, self.attached_to_doctype, self.attached_to_docname)
_file1 = frappe.get_doc("File", {"file_name": "hello.txt", "content": self.test_content1,
"attached_to_doctype": self.attached_to_doctype, "attached_to_docname": self.attached_to_docname})
_file2 = frappe.get_doc("File", {"file_name": "hello.txt", "content": self.test_content2,
"attached_to_doctype": self.attached_to_doctype, "attached_to_docname": self.attached_to_docname})
self.saved_file1 = _file1.save_file()
self.saved_file2 = _file2.save_file()
self.saved_filename1 = get_files_path(self.saved_file1.file_name)
self.saved_filename2 = get_files_path(self.saved_file2.file_name)
@ -65,8 +71,12 @@ class TestSameContent(unittest.TestCase):
self.test_content2 = test_content1
self.orig_filename = 'hello.txt'
self.dup_filename = 'hello2.txt'
self.saved_file1 = save_file(self.orig_filename, self.test_content1, self.attached_to_doctype1, self.attached_to_docname1)
self.saved_file2 = save_file(self.dup_filename, self.test_content2, self.attached_to_doctype2, self.attached_to_docname2)
_file1 = frappe.get_doc("File", {"file_name": self.orig_filename, "content": self.test_content1,
"attached_to_doctype": self.attached_to_doctype1, "attached_to_docname": self.attached_to_docname1})
_file2 = frappe.get_doc("File", {"file_name": self.dup_filename, "content": self.test_content2,
"attached_to_doctype": self.attached_to_doctype2, "attached_to_docname": self.attached_to_docname2})
self.saved_file1 = _file1.save_file()
self.saved_file2 = _file2.save_file()
self.saved_filename1 = get_files_path(self.saved_file1.file_name)
self.saved_filename2 = get_files_path(self.saved_file2.file_name)

View file

@ -333,10 +333,11 @@ def get_qr_svg_code(totp_uri):
def qrcode_as_png(user, totp_uri):
'''Save temporary Qrcode to server.'''
from frappe.utils.file_manager import save_file
folder = create_barcode_folder()
png_file_name = '{}.png'.format(frappe.generate_hash(length=20))
file_obj = save_file(png_file_name, png_file_name, 'User', user, folder=folder)
_file = frappe.get_doc("File", {"file_name": png_file_name, "content": png_file_name,
"attached_to_doctype": 'User', "attached_to_name": user})
file_obj = _file.save_file(folder=folder)
frappe.db.commit()
file_url = get_url(file_obj.file_url)
file_path = os.path.join(frappe.get_site_path('public', 'files'), file_obj.file_name)

View file

@ -15,8 +15,8 @@ def read_csv_content_from_uploaded_file(ignore_encoding=False):
with open(frappe.uploaded_file, "r") as upfile:
fcontent = upfile.read()
else:
from frappe.utils.file_manager import get_uploaded_content
fname, fcontent = get_uploaded_content()
_file = frappe.get_doc("File")
fname, fcontent = _file.get_uploaded_content()
return read_csv_content(fcontent, ignore_encoding)
def read_csv_content_from_attached_file(doc):

View file

@ -6,7 +6,6 @@ import frappe, json, os
from frappe.website.website_generator import WebsiteGenerator
from frappe import _, scrub
from frappe.utils import cstr
from frappe.utils.file_manager import save_file
from frappe.website.utils import get_comment_list
from frappe.custom.doctype.customize_form.customize_form import docfield_properties
from frappe.utils.file_manager import get_max_file_size
@ -421,8 +420,9 @@ def accept(web_form, data, for_payment=False):
# save new file
filename, dataurl = filedata.split(',', 1)
filedoc = save_file(filename, dataurl,
doc.doctype, doc.name, decode=True)
_file = frappe.get_doc("File", {"file_name": filename, "content": dataurl,
"attached_to_doctype": doc.doctype, "attached_to_name": doc.name})
filedoc = _file.save_file(decode=True)
# update values
doc.set(fieldname, filedoc.file_url)