77 lines
No EOL
2.2 KiB
Python
77 lines
No EOL
2.2 KiB
Python
from aiohttp import web
|
|
import aiohttp_session
|
|
import aiohttp_security
|
|
|
|
import base64
|
|
#from cryptography import fernet
|
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
|
|
|
account_types = {
|
|
'LOCAL': 'User (on local network)',
|
|
'USER': 'User (entered password)',
|
|
'ADMIN': 'Administrator'
|
|
}
|
|
|
|
@web.middleware
|
|
async def redirect_to_login(request, handler):
|
|
try:
|
|
return await handler(request)
|
|
except web.HTTPException as ex:
|
|
if ex.status in (401, 403):
|
|
raise web.HTTPFound(f'/log/in?status={ex.status}&url={request.url}')
|
|
else:
|
|
raise
|
|
|
|
# except Exception:
|
|
# return await handle_500(request)
|
|
|
|
def try_password(password):
|
|
passwords = {
|
|
'userpass': 'USER',
|
|
'adminpass': 'ADMIN'
|
|
}
|
|
return passwords.get(password)
|
|
|
|
def get_cookie_storage():
|
|
# chosen by fair dice roll ( fernet.Fernet.generate_key() )
|
|
# guaranteed to be random.
|
|
fernet_key = b'_TxCY776Q1GtN6dFSvAuhqSW6O9gEI1MZL8hYoK92DA='
|
|
secret_key = base64.urlsafe_b64decode(fernet_key)
|
|
return EncryptedCookieStorage(secret_key, cookie_name="WEB_APP_DEMOS")
|
|
|
|
class LocalSessionIdentityPolicy(aiohttp_security.abc.AbstractIdentityPolicy):
|
|
def __init__(self, session_key = 'auth'):
|
|
self._session_key = session_key
|
|
|
|
async def identify(self, request):
|
|
session = await aiohttp_session.get_session(request)
|
|
key = session.get(self._session_key)
|
|
if key:
|
|
return key
|
|
|
|
real_ip = request.headers.get('X-Real-IP')
|
|
if real_ip is not None and real_ip != '10.69.0.1' and (real_ip == '127.0.0.1' or real_ip.startswith('10.')):
|
|
return 'LOCAL'
|
|
|
|
async def remember(self, request, response, identity, **kwargs):
|
|
session = await aiohttp_session.get_session(request)
|
|
session[self._session_key] = identity
|
|
|
|
async def forget(self, request, response):
|
|
session = await aiohttp_session.get_session(request)
|
|
session.pop(self._session_key, None)
|
|
|
|
class SimplePasswordAuthPolicy(aiohttp_security.abc.AbstractAuthorizationPolicy):
|
|
async def authorized_userid(self, identity):
|
|
if identity in account_types:
|
|
return identity
|
|
|
|
async def permits(self, identity, permission, context=None):
|
|
if identity == 'LOCAL':
|
|
return permission in ('user',)
|
|
elif identity == 'USER':
|
|
return permission in ('user',)
|
|
elif identity == 'ADMIN':
|
|
return True
|
|
|
|
return False |