from aiohttp import web import aiohttp_security import datetime import simplejson import tomllib from pathlib import Path from inspect import getmembers, isfunction import security from . import queries from . import subapps ONE_DAY = datetime.timedelta(days=1) class Reports: def __init__(self, report_path='reports'): self.reports = {} query = { key: value for key, value in getmembers(queries, isfunction) if not key.startswith("_") } for file_path in Path(report_path).glob('**/*.toml'): with open(file_path, 'rb') as f: self.reports[file_path.stem] = tomllib.load(f) if file_path.stem not in query: print(f"No query function defined for report {file_path.stem}") else: self.reports[file_path.stem]['query'] = query[file_path.stem] def __getitem__(self, item): return self.reports[item] def __contains__(self, item): return item in self.reports def __len__(self): return len(self.reports) def get(self, item, default=None): if item in self.reports: return self.reports[item] else: return default def build_menu(app): menu_structure = {'left': [], 'right': []} for menu in app['config']["menus"]: menu_items = [] for heading in menu['headings']: items = [] for item in heading['items']: if item == "divider": items.append('\t\t\t\t\t\t\t
  • \n') else: if menu["type"] == "reports": url = f"/reporter/report/{item}" lock = '' if app['reports'][item].get('permissions') else '' name = lock + app['reports'][item]['title'] else: url = f"/{item[0]}" name = item[1] items.append(f'\t\t\t\t\t\t\t
  • {name}
  • \n') parts = { 'id': heading['name'].lower().replace(' ', '_'), 'name': heading['name'], 'items': ''.join(items), 'align': 'dropdown-menu-end' if menu['align'] == 'right' else '' } menu_items.append(app['templates']['navbar-menu.html'].safe_substitute(parts)) menu_structure[menu['align']].append(''.join(menu_items)) return app['templates']['navbar.html'].safe_substitute({ 'title': app['config']['app']['name'], 'leftmenu' : '\n'.join(menu_structure['left']), 'rightmenu': '\n'.join(menu_structure['right']) }) def init_app(app): app['reports'] = Reports('apps/reporter/reports') app['start'] = datetime.datetime.now() app['stats'] = {'reports_ran': 0, 'status': 'Normal'} app['menu'] = build_menu(app) async def db_query(query, params=()): await app['cur'].execute(query, params) q = app['cur'].mogrify(query, params) r = await app['cur'].fetchall() result = [{k: (str(v) if isinstance(v, datetime.date) or isinstance(v, datetime.timedelta) else v) for k, v in row.items()} for row in r] return (q, result) routes = web.RouteTableDef() app.add_subapp('/admin', subapps.admin) @routes.get('') async def bare_redirect(request): raise web.HTTPFound(app['prefix']) @routes.get('/') async def home(request): await aiohttp_security.check_permission(request, 'user') uptime = datetime.datetime.now() - request.app['start'] user = await aiohttp_security.authorized_userid(request) logout_button = 'Log out' role_specific = { 'LOCAL': ['Log in'], 'USER': [logout_button], 'ADMIN': ['Admin console', logout_button] } parts = { 'title': request.config_dict['config']['app']['name'], 'menu': request.config_dict['menu'], 'version': request.config_dict['config']['app']['version'], 'reports': len(request.config_dict['reports']), 'uptime': datetime.timedelta(days=uptime.days, seconds=uptime.seconds), 'reports_ran': request.app['stats']['reports_ran'], 'usertype': security.account_types[user], 'role_specific': '
    '.join(role_specific.get(user, [])) } return web.Response(text=request.config_dict['templates']['home.html'].safe_substitute(parts), content_type="text/html") # Autocomplete suggestions @routes.get('/suggest/{table}') async def get_suggestions(request): await aiohttp_security.check_permission(request, 'user') table = request.match_info['table'] if table == 'customer': query = "SELECT Customer + ' - ' + Name FROM CompanyC.dbo.ArCustomer" elif table == 'supplier': query = "SELECT Supplier + ' - ' + SupplierName FROM CompanyC.dbo.ApSupplier" else: return web.json_response([]) params = () await request.config_dict['cur'].execute(query, params) q = request.config_dict['cur'].mogrify(query, params) results = await request.config_dict['cur'].fetchall() return web.json_response([row[0] for row in results], dumps=simplejson.dumps) @routes.get('/report/{report}') async def load_report(request): name = request.match_info['report'] if name not in request.config_dict['reports']: return web.HTTPNotFound() report = request.config_dict['reports'][name] permissions = report.get("permissions", 'user') await aiohttp_security.check_permission(request, permissions) options = [] if 'options' in report: for option in report['options']: controls = [] for control in option['controls']: if control['type'] in ['checkbox', 'option']: if len(request.query) == 0 or (len(request.query) == 1 and 'run' in request.query): checked = control.get('default', '') elif control['type'] == 'checkbox': checked = 'checked' if f"{option['group']}:{control['key']}" in request.query else '' elif control['type'] == 'option': checked = 'checked' if request.query.get(option['group']) == control['key'] else '' details = { 'group' : option['group'], 'key' : control['key'], 'label' : control['label'], 'checked': checked } controls.append( request.config_dict['templates'][f"options/{control['type']}.html"].safe_substitute(details) ) elif control['type'] == 'select': #TODO this pass elif control['type'] in ['date', 'month']: value = request.query.get(f"{option['group']}:{control['key']}") if value is None: default = control.get('default') if control['type'] == 'date': if default == 'last_business_day': yesterday = datetime.date.today() - ONE_DAY days_after_friday = max(yesterday.weekday() - 4, 0) value = (yesterday - days_after_friday*ONE_DAY).isoformat() elif default == 'first_of_this_month': value = datetime.date.today().replace(day=1).isoformat() elif default == '36_months_ago': start = datetime.date.today() - relativedelta(years=3) value = start.isoformat() elif default == '5_years_ago': start = datetime.date.today() - relativedelta(years=5) value = start.isoformat() elif default == '1996-07-04': value = datetime.date(1996,7,4).isoformat() elif default == '1997-02-12': value = datetime.date(1997,2,12).isoformat() else: value = datetime.date.today().isoformat() elif control['type'] == 'month': if default == 'last_month': last_month = datetime.date.today().replace(day=1) - ONE_DAY value = last_month.strftime('%Y-%m') elif default == '12_months_ago': start = datetime.date.today() - relativedelta(years=1) value = start.strftime('%Y-%m') elif default == '36_months_ago': start = datetime.date.today() - relativedelta(years=3) value = start.strftime('%Y-%m') else: value = datetime.date.today().strftime('%Y-%m') details = { 'group' : option['group'], 'key' : control['key'], 'label' : control['label'], 'type' : control['type'], 'value' : f"value='{value}'", 'suggest': '' } controls.append( request.config_dict['templates']['options/date.html'].safe_substitute(details) ) elif control['type'] == 'number': value = request.query.get(f"{option['group']}:{control['key']}") if value is None: default = control.get('default') if default == 'this_year': value = datetime.date.today().year elif default == 'this_month': value = datetime.date.today().month else: value = 1 o_min = control.get('min') o_max = control.get('max') details = { 'group' : option['group'], 'key' : control['key'], 'label' : control['label'], 'type' : control['type'], 'min' : f"min='{o_min}'" if o_min else '', 'max' : f"max='{o_max}'" if o_max else '', 'value' : f"value='{value}'" } controls.append( request.config_dict['templates']['options/number.html'].safe_substitute(details) ) elif control['type'] in ['text']: _id = f"{option['group']}:{control['key']}" if _id in request.query: value = urllib.parse.unquote(request.query[_id]) else: value = control.get('default', '') details = { 'group': option['group'], 'key' : control['key'], 'label': control['label'], 'type' : control['type'], 'value': f"value='{value}'" if value else '' } suggest = control.get('suggest') if suggest: controls.append( request.config_dict['templates']['options/search.html'].safe_substitute(details | { 'suggest': ",".join(suggest) }) ) else: controls.append( request.config_dict['templates']['options/date.html'].safe_substitute(details) ) helper = request.config_dict['templates']['options/checkbox-helper.html'].safe_substitute({'group': option['group']}) fieldset = { 'group': option['group'], 'helper': helper if 'helper' in option else '', 'name': option['name'], 'controls': ''.join(controls) } options.append( request.config_dict['templates']['options/fieldset.html'].safe_substitute(fieldset) ) parts = { 'title': f"{report['title']} - {request.config_dict['config']['app']['name']}", 'report_title': report['title'], 'report_description': report['description'].replace('\n', '
    '), 'options': ''.join(options), 'menu': request.config_dict['menu'], 'autorun': 'true' if 'run' in request.query else 'false' } return web.Response(text=request.config_dict['templates']['report.html'].safe_substitute(parts), content_type="text/html") @routes.post('/report/{report}') async def run_report(request): name = request.match_info['report'] if name not in request.config_dict['reports']: return web.HTTPNotFound() report = request.config_dict['reports'][name] permissions = report.get("permissions", 'user') await aiohttp_security.check_permission(request, permissions) user_options = {} postdata = await request.post() for fieldname, value in postdata.items(): if ':' in fieldname: group, key = fieldname.split(':') group_options = user_options.get(group, {}) group_options[key] = value user_options[group] = group_options else: user_options[fieldname] = value query_response = await report['query'](request.config_dict['cur'], user_options) report_options = { 'formatting': report.get('formatting', True), 'group_by': report.get('group_by') } if query_response['status'] == 'ok': request.app['stats']['reports_ran'] += 1 return web.json_response({ 'status': 'ok', 'title': report['title'], 'period': query_response.get('period'), 'columns': query_response.get('columns', report.get('columns')), 'subcolumns': report.get('subcolumns'), 'rows': query_response['rows'], 'subrows': query_response.get('subrows'), 'options': report_options }, dumps=simplejson.dumps) else: return web.json_response({'status': 'error', 'errors': query_response['errors']}) app.add_routes(routes)