seitime-frappe/frappe/utils/csvutils.py
Gavin D'souza 3446026555 chore: Update header: license.txt => LICENSE
The license.txt file has been replaced with LICENSE for quite a while
now. INAL but it didn't seem accurate to say "hey, checkout license.txt
although there's no such file". Apart from this, there were
inconsistencies in the headers altogether...this change brings
consistency.
2021-09-03 12:02:59 +05:30

198 lines
5.6 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import frappe
from frappe import msgprint, _
import json
import csv
import requests
from io import StringIO
from frappe.utils import cstr, cint, flt, comma_or
def read_csv_content_from_attached_file(doc):
fileid = frappe.get_all("File", fields = ["name"], filters = {"attached_to_doctype": doc.doctype,
"attached_to_name":doc.name}, order_by="creation desc")
if fileid : fileid = fileid[0].name
if not fileid:
msgprint(_("File not attached"))
raise Exception
try:
_file = frappe.get_doc("File", fileid)
fcontent = _file.get_content()
return read_csv_content(fcontent, frappe.form_dict.get('ignore_encoding_errors'))
except Exception:
frappe.throw(_("Unable to open attached file. Did you export it as CSV?"), title=_('Invalid CSV Format'))
def read_csv_content(fcontent, ignore_encoding=False):
rows = []
if not isinstance(fcontent, str):
decoded = False
for encoding in ["utf-8", "windows-1250", "windows-1252"]:
try:
fcontent = str(fcontent, encoding)
decoded = True
break
except UnicodeDecodeError:
continue
if not decoded:
frappe.msgprint(_("Unknown file encoding. Tried utf-8, windows-1250, windows-1252."), raise_exception=True)
fcontent = fcontent.encode("utf-8")
content = [ ]
for line in fcontent.splitlines(True):
content.append(frappe.safe_decode(line))
try:
rows = []
for row in csv.reader(content):
r = []
for val in row:
# decode everything
val = val.strip()
if val=="":
# reason: in maraidb strict config, one cannot have blank strings for non string datatypes
r.append(None)
else:
r.append(val)
rows.append(r)
return rows
except Exception:
frappe.msgprint(_("Not a valid Comma Separated Value (CSV File)"))
raise
@frappe.whitelist()
def send_csv_to_client(args):
if isinstance(args, str):
args = json.loads(args)
args = frappe._dict(args)
frappe.response["result"] = cstr(to_csv(args.data))
frappe.response["doctype"] = args.filename
frappe.response["type"] = "csv"
def to_csv(data):
writer = UnicodeWriter()
for row in data:
writer.writerow(row)
return writer.getvalue()
def build_csv_response(data, filename):
frappe.response["result"] = cstr(to_csv(data))
frappe.response["doctype"] = filename
frappe.response["type"] = "csv"
class UnicodeWriter:
def __init__(self, encoding="utf-8", quoting=csv.QUOTE_NONNUMERIC):
self.encoding = encoding
self.queue = StringIO()
self.writer = csv.writer(self.queue, quoting=quoting)
def writerow(self, row):
self.writer.writerow(row)
def getvalue(self):
return self.queue.getvalue()
def check_record(d):
"""check for mandatory, select options, dates. these should ideally be in doclist"""
from frappe.utils.dateutils import parse_date
doc = frappe.get_doc(d)
for key in d:
docfield = doc.meta.get_field(key)
val = d[key]
if docfield:
if docfield.reqd and (val=='' or val==None):
frappe.msgprint(_("{0} is required").format(docfield.label), raise_exception=1)
if docfield.fieldtype=='Select' and val and docfield.options:
if val not in docfield.options.split('\n'):
frappe.throw(_("{0} must be one of {1}").format(_(docfield.label), comma_or(docfield.options.split("\n"))))
if val and docfield.fieldtype=='Date':
d[key] = parse_date(val)
elif val and docfield.fieldtype in ["Int", "Check"]:
d[key] = cint(val)
elif val and docfield.fieldtype in ["Currency", "Float", "Percent"]:
d[key] = flt(val)
def import_doc(d, doctype, overwrite, row_idx, submit=False, ignore_links=False):
"""import main (non child) document"""
if d.get("name") and frappe.db.exists(doctype, d['name']):
if overwrite:
doc = frappe.get_doc(doctype, d['name'])
doc.flags.ignore_links = ignore_links
doc.update(d)
if d.get("docstatus") == 1:
doc.update_after_submit()
elif d.get("docstatus") == 0 and submit:
doc.submit()
else:
doc.save()
return 'Updated row (#%d) %s' % (row_idx + 1, getlink(doctype, d['name']))
else:
return 'Ignored row (#%d) %s (exists)' % (row_idx + 1,
getlink(doctype, d['name']))
else:
doc = frappe.get_doc(d)
doc.flags.ignore_links = ignore_links
doc.insert()
if submit:
doc.submit()
return 'Inserted row (#%d) %s' % (row_idx + 1, getlink(doctype,
doc.get('name')))
def getlink(doctype, name):
return '<a href="/app/Form/%(doctype)s/%(name)s">%(name)s</a>' % locals()
def get_csv_content_from_google_sheets(url):
# https://docs.google.com/spreadsheets/d/{sheetid}}/edit#gid={gid}
validate_google_sheets_url(url)
# get gid, defaults to first sheet
if "gid=" in url:
gid = url.rsplit('gid=', 1)[1]
else:
gid = 0
# remove /edit path
url = url.rsplit('/edit', 1)[0]
# add /export path,
url = url + '/export?format=csv&gid={0}'.format(gid)
headers = {
'Accept': 'text/csv'
}
response = requests.get(url, headers=headers)
if response.ok:
# if it returns html, it couldn't find the CSV content
# because of invalid url or no access
if response.text.strip().endswith('</html>'):
frappe.throw(
_('Google Sheets URL is invalid or not publicly accessible.'),
title=_("Invalid URL")
)
return response.content
elif response.status_code == 400:
frappe.throw(_('Google Sheets URL must end with "gid={number}". Copy and paste the URL from the browser address bar and try again.'),
title=_("Incorrect URL"))
else:
response.raise_for_status()
def validate_google_sheets_url(url):
if "docs.google.com/spreadsheets" not in url:
frappe.throw(
_('"{0}" is not a valid Google Sheets URL').format(url),
title=_("Invalid URL"),
)