seitime-frappe/frappe/utils/csvutils.py
Gavin D'souza e407b78506 chore: Drop dead and deprecated code
* Remove six for PY2 compatability since our dependencies are not, PY2
  is legacy.
* Removed usages of utils from future/past libraries since they are
  deprecated. This includes 'from __future__ ...' and 'from past...'
  statements.
* Removed compatibility imports for PY2, switched from six imports to
  standard library imports.
* Removed utils code blocks that handle operations depending on PY2/3
  versions.
* Removed 'from __future__ ...' lines from templates/code generators
* Used PY3 syntaxes in place of PY2 compatible blocks. eg: metaclass
2021-05-26 15:31:29 +05:30

207 lines
5.9 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
import frappe
from frappe import msgprint, _
import json
import csv
import requests
from io import StringIO
from frappe.utils import encode, cstr, cint, flt, comma_or
def read_csv_content_from_uploaded_file(ignore_encoding=False):
if getattr(frappe, "uploaded_file", None):
with open(frappe.uploaded_file, "r") as upfile:
fcontent = upfile.read()
else:
_file = frappe.new_doc("File")
fcontent = _file.get_uploaded_content()
return read_csv_content(fcontent, ignore_encoding)
def read_csv_content_from_attached_file(doc):
fileid = frappe.get_all("File", fields = ["name"], filters = {"attached_to_doctype": doc.doctype,
"attached_to_name":doc.name}, order_by="creation desc")
if fileid : fileid = fileid[0].name
if not fileid:
msgprint(_("File not attached"))
raise Exception
try:
_file = frappe.get_doc("File", fileid)
fcontent = _file.get_content()
return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors'))
except Exception:
frappe.throw(_("Unable to open attached file. Did you export it as CSV?"), title=_('Invalid CSV Format'))
def read_csv_content(fcontent, ignore_encoding=False):
rows = []
if not isinstance(fcontent, str):
decoded = False
for encoding in ["utf-8", "windows-1250", "windows-1252"]:
try:
fcontent = str(fcontent, encoding)
decoded = True
break
except UnicodeDecodeError:
continue
if not decoded:
frappe.msgprint(_("Unknown file encoding. Tried utf-8, windows-1250, windows-1252."), raise_exception=True)
fcontent = fcontent.encode("utf-8")
content = [ ]
for line in fcontent.splitlines(True):
content.append(frappe.safe_decode(line))
try:
rows = []
for row in csv.reader(content):
r = []
for val in row:
# decode everything
val = val.strip()
if val=="":
# reason: in maraidb strict config, one cannot have blank strings for non string datatypes
r.append(None)
else:
r.append(val)
rows.append(r)
return rows
except Exception:
frappe.msgprint(_("Not a valid Comma Separated Value (CSV File)"))
raise
@frappe.whitelist()
def send_csv_to_client(args):
if isinstance(args, str):
args = json.loads(args)
args = frappe._dict(args)
frappe.response["result"] = cstr(to_csv(args.data))
frappe.response["doctype"] = args.filename
frappe.response["type"] = "csv"
def to_csv(data):
writer = UnicodeWriter()
for row in data:
writer.writerow(row)
return writer.getvalue()
def build_csv_response(data, filename):
frappe.response["result"] = cstr(to_csv(data))
frappe.response["doctype"] = filename
frappe.response["type"] = "csv"
class UnicodeWriter:
def __init__(self, encoding="utf-8", quoting=csv.QUOTE_NONNUMERIC):
self.encoding = encoding
self.queue = StringIO()
self.writer = csv.writer(self.queue, quoting=quoting)
def writerow(self, row):
self.writer.writerow(row)
def getvalue(self):
return self.queue.getvalue()
def check_record(d):
"""check for mandatory, select options, dates. these should ideally be in doclist"""
from frappe.utils.dateutils import parse_date
doc = frappe.get_doc(d)
for key in d:
docfield = doc.meta.get_field(key)
val = d[key]
if docfield:
if docfield.reqd and (val=='' or val==None):
frappe.msgprint(_("{0} is required").format(docfield.label), raise_exception=1)
if docfield.fieldtype=='Select' and val and docfield.options:
if val not in docfield.options.split('\n'):
frappe.throw(_("{0} must be one of {1}").format(_(docfield.label), comma_or(docfield.options.split("\n"))))
if val and docfield.fieldtype=='Date':
d[key] = parse_date(val)
elif val and docfield.fieldtype in ["Int", "Check"]:
d[key] = cint(val)
elif val and docfield.fieldtype in ["Currency", "Float", "Percent"]:
d[key] = flt(val)
def import_doc(d, doctype, overwrite, row_idx, submit=False, ignore_links=False):
"""import main (non child) document"""
if d.get("name") and frappe.db.exists(doctype, d['name']):
if overwrite:
doc = frappe.get_doc(doctype, d['name'])
doc.flags.ignore_links = ignore_links
doc.update(d)
if d.get("docstatus") == 1:
doc.update_after_submit()
elif d.get("docstatus") == 0 and submit:
doc.submit()
else:
doc.save()
return 'Updated row (#%d) %s' % (row_idx + 1, getlink(doctype, d['name']))
else:
return 'Ignored row (#%d) %s (exists)' % (row_idx + 1,
getlink(doctype, d['name']))
else:
doc = frappe.get_doc(d)
doc.flags.ignore_links = ignore_links
doc.insert()
if submit:
doc.submit()
return 'Inserted row (#%d) %s' % (row_idx + 1, getlink(doctype,
doc.get('name')))
def getlink(doctype, name):
return '<a href="/app/Form/%(doctype)s/%(name)s">%(name)s</a>' % locals()
def get_csv_content_from_google_sheets(url):
# https://docs.google.com/spreadsheets/d/{sheetid}}/edit#gid={gid}
validate_google_sheets_url(url)
# get gid, defaults to first sheet
if "gid=" in url:
gid = url.rsplit('gid=', 1)[1]
else:
gid = 0
# remove /edit path
url = url.rsplit('/edit', 1)[0]
# add /export path,
url = url + '/export?format=csv&gid={0}'.format(gid)
headers = {
'Accept': 'text/csv'
}
response = requests.get(url, headers=headers)
if response.ok:
# if it returns html, it couldn't find the CSV content
# because of invalid url or no access
if response.text.strip().endswith('</html>'):
frappe.throw(
_('Google Sheets URL is invalid or not publicly accessible.'),
title=_("Invalid URL")
)
return response.content
elif response.status_code == 400:
frappe.throw(_('Google Sheets URL must end with "gid={number}". Copy and paste the URL from the browser address bar and try again.'),
title=_("Incorrect URL"))
else:
response.raise_for_status()
def validate_google_sheets_url(url):
if "docs.google.com/spreadsheets" not in url:
frappe.throw(
_('"{0}" is not a valid Google Sheets URL').format(url),
title=_("Invalid URL"),
)