rgb-dossier/app.py
2026-01-15 17:58:34 -07:00

175 lines
5.1 KiB
Python

import json
import os
from pathlib import Path
from typing import Any
from aiohttp import web
from aiohttp_session import get_session, setup as setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from cryptography import fernet
from jinja2 import Environment, FileSystemLoader, select_autoescape
BASE_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = BASE_DIR / "templates"
DATA_DIR = BASE_DIR / "data"
STATIC_DIR = BASE_DIR / "static"
AGENTS_JSON = DATA_DIR / "agents.json"
LOGIN_USER = "tangerine@rgb.gov"
LOGIN_PASS = "*******"
_SESSION_KEY = "vji14btWrpmw9OR04u7OuBcFTebBYVrhi2PZPso1WII="
# Jinja filter
def text_color_for_bg(hex_color):
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16) / 255.0
g = int(hex_color[2:4], 16) / 255.0
b = int(hex_color[4:6], 16) / 255.0
def to_linear(c):
return c / 12.92 if c <= 0.04045 else ((c + 0.055) / 1.055) ** 2.4
R = to_linear(r)
G = to_linear(g)
B = to_linear(b)
L = 0.2126 * R + 0.7152 * G + 0.0722 * B
return "#ffffff" if L < 0.179 else "#000000"
_jinja = Environment(
loader=FileSystemLoader(str(TEMPLATES_DIR)),
autoescape=select_autoescape(["html", "xml"]),
)
_jinja.filters["text_color"] = text_color_for_bg
def render_template(template_name: str, *, context: dict[str, Any] | None = None) -> web.Response:
context = context or {}
html = _jinja.get_template(template_name).render(**context)
return web.Response(text=html, content_type="text/html")
# Agents data
_AGENTS_CACHE: list[dict[str, Any]] | None = None
def load_agents() -> list[dict[str, Any]]:
global _AGENTS_CACHE
if _AGENTS_CACHE is not None:
return _AGENTS_CACHE
if not AGENTS_JSON.exists():
_AGENTS_CACHE = []
return _AGENTS_CACHE
with AGENTS_JSON.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("data/agents.json must contain a JSON list of agent objects")
agents: list[dict[str, Any]] = []
for item in data:
if not isinstance(item, dict):
continue
codename = str(item.get("codename", "")).strip()
if not codename:
continue
agents.append(item)
_AGENTS_CACHE = agents
return agents
def get_agent_by_codename(codename: str) -> dict[str, Any] | None:
codename = (codename or "").strip()
for a in load_agents():
if str(a.get("codename", "")).strip() == codename:
return a
return None
# Auth helpers
async def is_authed(request: web.Request) -> bool:
session = await get_session(request)
return bool(session.get("authed"))
def redirect(location: str) -> web.Response:
return web.HTTPFound(location)
# Routes / Handlers
async def home(request: web.Request) -> web.Response:
if await is_authed(request):
return redirect("/agents")
next_url = request.query.get("next", "/agents")
return render_template("login.html", context={"next": next_url, "error": None})
async def login(request: web.Request) -> web.Response:
form = await request.post()
username = str(form.get("username", ""))
password = str(form.get("password", ""))
next_url = str(form.get("next", "/agents")) or "/agents"
if username.lower() == LOGIN_USER and password == LOGIN_PASS:
session = await get_session(request)
session["authed"] = True
session["user"] = username
return redirect(next_url)
return render_template("login.html", context={"next": next_url, "error": "ACCESS DENIED"})
async def logout(request: web.Request) -> web.Response:
"""
POST /logout clears session and returns to login.
"""
session = await get_session(request)
session.invalidate()
return redirect("/")
async def agents_index(request: web.Request) -> web.Response:
if not await is_authed(request):
return redirect("/?next=/agents")
agents = load_agents()
for a in agents:
if not a.get("photo"):
a["photo"] = "photos/default.png"
return render_template("agents.html", context={"agents": agents})
async def agent_detail(request: web.Request) -> web.Response:
codename = request.match_info.get("codename", "")
if not await is_authed(request):
return redirect(f"/?next=/agent/{codename}")
agent = get_agent_by_codename(codename)
if agent is None:
raise web.HTTPNotFound(text="Agent not found")
if not agent.get("photo"):
agent["photo"] = "photos/default.png"
return render_template("agent.html", context={"agent": agent})
# App wiring
def create_app() -> web.Application:
app = web.Application()
setup_session(app, EncryptedCookieStorage(_SESSION_KEY))
# Routes
app.router.add_get("/", home)
app.router.add_post("/login", login)
app.router.add_post("/logout", logout)
app.router.add_get("/agents", agents_index)
app.router.add_get("/agent/{codename}", agent_detail)
app.router.add_static("/static/", path=str(STATIC_DIR), name="static")
return app
if __name__ == "__main__":
host = os.getenv("HOST", "127.0.0.1")
port = int(os.getenv("PORT", "9000"))
web.run_app(create_app(), host=host, port=port)