import asyncio from aiohttp import web import aiohttp_security from datetime import datetime import os import psutil import random app = web.Application() routes = web.RouteTableDef() @routes.get('') async def admin_get(request): await aiohttp_security.check_permission(request, 'admin') template = request.config_dict['templates']['admin.html'] parts = { 'title': request.config_dict['config']['app']['name'] + ' Admin', 'menu': request.config_dict['menu'] } return web.Response(text=template.safe_substitute(parts), content_type="text/html") @routes.post('/api') async def api(request): await aiohttp_security.check_permission(request, 'admin') timestamp = datetime.now().strftime('%H:%M:%S') postdata = await request.post() command = postdata.get('command').split(' ') if command[0] == "status": try: count_tables = "SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'" results = await request.config_dict['query'](count_tables) tables = results[0][0] db_status = 'OK' except Exception as e: tables = 0 db_status = str(e).rstrip('.') core_count = psutil.cpu_count(logical=False) thread_count = psutil.cpu_count() cpu_load = psutil.cpu_percent() ram_stats = psutil.virtual_memory() ram_total = ram_stats[0] ram_used = ram_stats[3] data = f"""\ Web server status: {request.config_dict['stats']['status']}, started {request.config_dict['start']:%Y-%m-%d %H:%M:%S}. Database at {request.config_dict['config']['database']['host']}: {db_status}, access to {tables:,} tables. Load on {core_count} cores, {thread_count} threads: {cpu_load}%. Using {ram_used/1024**3:.1f} GiB of {ram_total/1024**3:.1f} GiB memory.""" return web.json_response({'status': 'ok', 'data': data, 'timestamp': timestamp}) elif command[0] == "shutdown": #request.config_dict['stats']['status'] = 'Shutting down' delay = 5 if len(command) == 1 else int(command[1]) delay = max(delay, 0) #asyncio.create_task(shutdown(request.config_dict['runner'], request.config_dict['waiter'], delay)) return web.json_response({'status': 'ok', 'data': f'This would shut down the app in {delay} seconds, but the feature is disabled for this demo.', 'timestamp': timestamp}) elif command[0] == "fortune": magic_8_ball = [ "It is certain", "It is decidedly so", "Without a doubt", "Yes definitely", "You may rely on it", "As I see it, yes", "Most likely", "Outlook good", "Yes", "Signs point to yes", "Reply hazy, try again", "Ask again later", "Better not tell you now", "Cannot predict now", "Concentrate and ask again", "Don't count on it", "My reply is no", "My sources say no", "Outlook not so good", "Very doubtful" ] return web.json_response({'status': 'ok', 'data': f'Your magic 8-ball prediction: {random.choice(magic_8_ball)}', 'timestamp': timestamp}) elif command[0] == "help": message = """\ status - Prints an overview of system operation and resource usage shutdown [delay] - Shuts down the application after a timeout in seconds (default 5, disabled for this demo) help - Prints this message""" return web.json_response({'status': 'ok', 'data': message, 'timestamp': timestamp}) else: return web.json_response({'status': 'error', 'data': f'Unrecognized command "{command[0]}", try "help".', 'timestamp': timestamp}) async def shutdown(runner, waiter, delay=0): await asyncio.sleep(delay) await runner.cleanup() waiter.set() app.add_routes(routes)