330 lines
9 KiB
Python
330 lines
9 KiB
Python
import webnotes
|
|
|
|
form = webnotes.form
|
|
session = webnotes.session
|
|
sql = webnotes.conn.sql
|
|
out = webnotes.response
|
|
|
|
from webnotes.utils import cint
|
|
|
|
def get_search_criteria_list(dt):
|
|
sc_list = sql("select criteria_name, doc_type from `tabSearch Criteria` where doc_type = '%s' or parent_doc_type = '%s'" % (dt, dt))
|
|
return [list(s) for s in sc_list]
|
|
|
|
def load_report_list():
|
|
webnotes.response['rep_list'] = get_search_criteria_list(form.getvalue('dt'))
|
|
|
|
|
|
# Get, scrub metadata
|
|
# ====================================================================
|
|
|
|
def get_sql_tables(q):
|
|
if q.find('WHERE') != -1:
|
|
tl = q.split('FROM')[1].split('WHERE')[0].split(',')
|
|
elif q.find('GROUP BY') != -1:
|
|
tl = q.split('FROM')[1].split('GROUP BY')[0].split(',')
|
|
else:
|
|
tl = q.split('FROM')[1].split('ORDER BY')[0].split(',')
|
|
return [t.strip().strip('`')[3:] for t in tl]
|
|
|
|
|
|
def get_parent_dt(dt):
|
|
pdt = ''
|
|
if sql('select name from `tabDocType` where istable=1 and name="%s"' % dt):
|
|
res = sql('select parent from `tabDocField` where fieldtype="Table" and options="%s"' % dt)
|
|
if res: pdt = res[0][0]
|
|
return pdt
|
|
|
|
def get_sql_meta(tl):
|
|
std_columns = {
|
|
'owner':('Owner', '', '', '100'),
|
|
'creation':('Created on', 'Date', '', '100'),
|
|
'modified':('Last modified on', 'Date', '', '100'),
|
|
'modified_by':('Modified By', '', '', '100')
|
|
}
|
|
|
|
meta = {}
|
|
|
|
for dt in tl:
|
|
meta[dt] = std_columns.copy()
|
|
|
|
# for table doctype, the ID is the parent id
|
|
pdt = get_parent_dt(dt)
|
|
if pdt:
|
|
meta[dt]['parent'] = ('ID', 'Link', pdt, '200')
|
|
|
|
# get the field properties from DocField
|
|
res = sql("select fieldname, label, fieldtype, options, width from tabDocField where parent='%s'" % dt)
|
|
for r in res:
|
|
if r[0]:
|
|
meta[dt][r[0]] = (r[1], r[2], r[3], r[4]);
|
|
|
|
# name
|
|
meta[dt]['name'] = ('ID', 'Link', dt, '200')
|
|
|
|
return meta
|
|
|
|
# Additional conditions to fulfill match permission rules
|
|
# ====================================================================
|
|
|
|
def getmatchcondition(dt, ud, ur):
|
|
res = sql("SELECT `role`, `match` FROM tabDocPerm WHERE parent = '%s' AND (`read`=1) AND permlevel = 0" % dt)
|
|
cond = []
|
|
for r in res:
|
|
if r[0] in ur: # role applicable to user
|
|
if r[1]:
|
|
defvalues = ud.get(r[1],['_NA'])
|
|
for d in defvalues:
|
|
cond.append('`tab%s`.`%s`="%s"' % (dt, r[1], d))
|
|
else: # nomatch i.e. full read rights
|
|
return ''
|
|
|
|
return ' OR '.join(cond)
|
|
|
|
def add_match_conditions(q, tl, ur, ud):
|
|
sl = []
|
|
for dt in tl:
|
|
s = getmatchcondition(dt, ud, ur)
|
|
if s:
|
|
sl.append(s)
|
|
|
|
# insert the conditions
|
|
if sl:
|
|
condition_st = q.find('WHERE')!=-1 and ' AND ' or ' WHERE '
|
|
|
|
condition_end = q.find('ORDER BY')!=-1 and 'ORDER BY' or 'LIMIT'
|
|
condition_end = q.find('GROUP BY')!=-1 and 'GROUP BY' or condition_end
|
|
|
|
if q.find('ORDER BY')!=-1 or q.find('LIMIT')!=-1 or q.find('GROUP BY')!=-1: # if query continues beyond conditions
|
|
q = q.split(condition_end)
|
|
q = q[0] + condition_st + '(' + ' OR '.join(sl) + ') ' + condition_end + q[1]
|
|
else:
|
|
q = q + condition_st + '(' + ' OR '.join(sl) + ')'
|
|
|
|
return q
|
|
|
|
# execute server-side script from Search Criteria
|
|
# ====================================================================
|
|
|
|
def exec_report(code, res, colnames=[], colwidths=[], coltypes=[], coloptions=[], filter_values={}, query='', from_export=0):
|
|
col_idx, i, out, style, header_html, footer_html, page_template = {}, 0, None, [], '', '', ''
|
|
for c in colnames:
|
|
col_idx[c] = i
|
|
i+=1
|
|
|
|
# load globals (api)
|
|
from webnotes import *
|
|
from webnotes.utils import *
|
|
from webnotes.model.doc import *
|
|
from webnotes.model.doclist import getlist
|
|
from webnotes.model.db_schema import updatedb
|
|
from webnotes.model.code import get_obj
|
|
|
|
set = webnotes.conn.set
|
|
sql = webnotes.conn.sql
|
|
get_value = webnotes.conn.get_value
|
|
convert_to_lists = webnotes.conn.convert_to_lists
|
|
NEWLINE = '\n'
|
|
|
|
exec str(code)
|
|
|
|
if out!=None:
|
|
res = out
|
|
|
|
return res, style, header_html, footer_html, page_template
|
|
|
|
# ====================================================================
|
|
|
|
def guess_type(m):
|
|
"""
|
|
Returns fieldtype depending on the MySQLdb Description
|
|
"""
|
|
import MySQLdb
|
|
if m in MySQLdb.NUMBER:
|
|
return 'Currency'
|
|
elif m in MySQLdb.DATE:
|
|
return 'Date'
|
|
else:
|
|
return 'Data'
|
|
|
|
def build_description_simple():
|
|
colnames, coltypes, coloptions, colwidths = [], [], [], []
|
|
|
|
for m in webnotes.conn.get_description():
|
|
colnames.append(m[0])
|
|
coltypes.append(guess_type[m[0]])
|
|
coloptions.append('')
|
|
colwidths.append('100')
|
|
|
|
return colnames, coltypes, coloptions, colwidths
|
|
|
|
# ====================================================================
|
|
|
|
def build_description_standard(meta, tl):
|
|
|
|
desc = webnotes.conn.get_description()
|
|
|
|
colnames, coltypes, coloptions, colwidths = [], [], [], []
|
|
|
|
# merged metadata - used if we are unable to
|
|
# get both the table name and field name from
|
|
# the description - in case of joins
|
|
merged_meta = {}
|
|
for d in meta:
|
|
merged_meta.update(meta[d])
|
|
|
|
for f in desc:
|
|
fn, dt = f[0], ''
|
|
if '.' in fn:
|
|
dt, fn = fn.split('.')
|
|
|
|
if (not dt) and merged_meta.get(fn):
|
|
# no "AS" given, find type from merged description
|
|
|
|
desc = merged_meta[fn]
|
|
colnames.append(desc[0] or fn)
|
|
coltypes.append(desc[1] or '')
|
|
coloptions.append(desc[2] or '')
|
|
colwidths.append(desc[3] or '100')
|
|
|
|
elif meta.get(dt,{}).has_key(fn):
|
|
# type specified for a multi-table join
|
|
# usually from Report Builder
|
|
|
|
desc = meta[dt][fn]
|
|
colnames.append(desc[0] or fn)
|
|
coltypes.append(desc[1] or '')
|
|
coloptions.append(desc[2] or '')
|
|
colwidths.append(desc[3] or '100')
|
|
|
|
else:
|
|
# nothing found
|
|
# guess
|
|
|
|
colnames.append(fn)
|
|
coltypes.append(guess_type(f[1]))
|
|
coloptions.append('')
|
|
colwidths.append('100')
|
|
|
|
return colnames, coltypes, coloptions, colwidths
|
|
|
|
# Entry Point - Run the query
|
|
# ====================================================================
|
|
|
|
def runquery(q='', ret=0, from_export=0):
|
|
import webnotes.utils
|
|
|
|
formatted = cint(form.getvalue('formatted'))
|
|
|
|
# CASE A: Simple Query
|
|
# --------------------
|
|
if form.getvalue('simple_query') or form.getvalue('is_simple'):
|
|
q = form.getvalue('simple_query') or form.getvalue('query')
|
|
if q.split()[0].lower() != 'select':
|
|
raise Exception, 'Query must be a SELECT'
|
|
|
|
as_dict = cint(form.getvalue('as_dict'))
|
|
res = sql(q, as_dict = as_dict, as_list = not as_dict, formatted=formatted)
|
|
|
|
# build colnames etc from metadata
|
|
colnames, coltypes, coloptions, colwidths = [], [], [], []
|
|
|
|
# CASE B: Standard Query
|
|
# -----------------------
|
|
else:
|
|
if not q: q = form.getvalue('query')
|
|
|
|
tl = get_sql_tables(q)
|
|
meta = get_sql_meta(tl)
|
|
|
|
q = add_match_conditions(q, tl, webnotes.user.roles, webnotes.user.get_defaults())
|
|
|
|
# replace special variables
|
|
q = q.replace('__user', session['user'])
|
|
q = q.replace('__today', webnotes.utils.nowdate())
|
|
|
|
res = sql(q, as_list=1, formatted=formatted)
|
|
|
|
colnames, coltypes, coloptions, colwidths = build_description_standard(meta, tl)
|
|
|
|
# run server script
|
|
# -----------------
|
|
style, header_html, footer_html, page_template = '', '', '', ''
|
|
if form.has_key('sc_id') and form.getvalue('sc_id'):
|
|
sc_id = form.getvalue('sc_id')
|
|
from webnotes.model.code import get_code
|
|
sc_details = webnotes.conn.sql("select module, standard, server_script from `tabSearch Criteria` where name=%s", sc_id)[0]
|
|
if sc_details[1]!='No':
|
|
code = get_code(sc_details[0], 'Search Criteria', sc_id, 'py')
|
|
else:
|
|
code = sc_details[2]
|
|
|
|
if code:
|
|
filter_values = form.has_key('filter_values') and eval(form.getvalue('filter_values','')) or {}
|
|
res, style, header_html, footer_html, page_template = exec_report(code, res, colnames, colwidths, coltypes, coloptions, filter_values, q, from_export)
|
|
|
|
out['colnames'] = colnames
|
|
out['coltypes'] = coltypes
|
|
out['coloptions'] = coloptions
|
|
out['colwidths'] = colwidths
|
|
out['header_html'] = header_html
|
|
out['footer_html'] = footer_html
|
|
out['page_template'] = page_template
|
|
|
|
if style:
|
|
out['style'] = style
|
|
|
|
# just the data - return
|
|
if ret==1:
|
|
return res
|
|
|
|
out['values'] = res
|
|
|
|
# return num of entries
|
|
qm = form.has_key('query_max') and form.getvalue('query_max') or ''
|
|
if qm and qm.strip():
|
|
if qm.split()[0].lower() != 'select':
|
|
raise Exception, 'Query (Max) must be a SELECT'
|
|
if not form.has_key('simple_query'):
|
|
qm = add_match_conditions(qm, tl, webnotes.user.roles, webnotes.user.defaults)
|
|
|
|
out['n_values'] = webnotes.utils.cint(sql(qm)[0][0])
|
|
|
|
# Export to CSV
|
|
# ====================================================================
|
|
|
|
def runquery_csv():
|
|
global out
|
|
|
|
# run query
|
|
res = runquery(from_export = 1)
|
|
|
|
q = form.getvalue('query')
|
|
|
|
rep_name = form.getvalue('report_name')
|
|
if not form.has_key('simple_query'):
|
|
|
|
# Report Name
|
|
if not rep_name:
|
|
rep_name = get_sql_tables(q)[0]
|
|
|
|
if not rep_name: rep_name = 'DataExport'
|
|
|
|
# Headings
|
|
heads = []
|
|
|
|
rows = [[rep_name], out['colnames']] + out['values']
|
|
|
|
from cStringIO import StringIO
|
|
import csv
|
|
|
|
f = StringIO()
|
|
writer = csv.writer(f)
|
|
for r in rows:
|
|
writer.writerow(r)
|
|
|
|
f.seek(0)
|
|
out['result'] = f.read()
|
|
out['type'] = 'csv'
|
|
out['doctype'] = rep_name
|
|
|