diff --git a/frappe/commands/redis.py b/frappe/commands/redis.py index c608296cac..38a46c2142 100644 --- a/frappe/commands/redis.py +++ b/frappe/commands/redis.py @@ -1,7 +1,6 @@ import os import click -import redis import frappe from frappe.utils.rq import RedisQueue @@ -9,16 +8,18 @@ from frappe.installer import update_site_config @click.command('create-rq-users') @click.option('--set-admin-password', is_flag=True, default=False, help='Set new Redis admin(default user) password') -@click.option('--reset-passwords', is_flag=True, default=False, help='Remove all existing passwords') -def create_rq_users(set_admin_password=False, reset_passwords=False): +@click.option('--use-rq-auth', is_flag=True, default=False, help='Enable Redis authentication for sites') +def create_rq_users(set_admin_password=False, use_rq_auth=False): """Create Redis Queue users and add to acl and app configs. acl config file will be used by redis server while starting the server and app config is used by app while connecting to redis server. """ acl_file_path = os.path.abspath('../config/redis_queue.acl') - acl_list, user_credentials = RedisQueue.gen_acl_list( - reset_passwords=reset_passwords, set_admin_password=set_admin_password) + + with frappe.init_site(): + acl_list, user_credentials = RedisQueue.gen_acl_list( + set_admin_password=set_admin_password) with open(acl_file_path, 'w') as f: f.writelines([acl+'\n' for acl in acl_list]) @@ -29,18 +30,22 @@ def create_rq_users(set_admin_password=False, reset_passwords=False): site_config_path=common_site_config_path) update_site_config("rq_password", user_credentials['bench'][1], validate=False, site_config_path=common_site_config_path) + update_site_config("use_rq_auth", use_rq_auth, validate=False, + site_config_path=common_site_config_path) + + click.secho('* ACL and site configs are updated with new user credentials. ' + 'Please restart Redis Queue server to enable namespaces.', + fg='green') if set_admin_password: env_key = 'RQ_ADMIN_PASWORD' - click.secho('Redis admin password is successfully set up. ' + click.secho('* Redis admin password is successfully set up. ' 'Include below line in .bashrc file for system to use', - fg='green' - ) + fg='green') click.secho(f"`export {env_key}={user_credentials['default'][1]}`") click.secho('NOTE: Please save the admin password as you ' 'can not access redis server without the password', - fg='yellow' - ) + fg='yellow') commands = [ diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index d69ebb3024..f82473fd55 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -172,9 +172,13 @@ def start_scheduler(): @click.command('worker') @click.option('--queue', type=str) @click.option('--quiet', is_flag = True, default = False, help = 'Hide Log Outputs') -def start_worker(queue, quiet = False): +@click.option('-u', '--rq-username', default=None, help='Redis ACL user') +@click.option('-p', '--rq-password', default=None, help='Redis ACL user password') +def start_worker(queue, quiet = False, rq_username=None, rq_password=None): + """Site is used to find redis credentals. + """ from frappe.utils.background_jobs import start_worker - start_worker(queue, quiet = quiet) + start_worker(queue, quiet = quiet, rq_username=rq_username, rq_password=rq_password) @click.command('ready-for-migration') @click.option('--site', help='site name') diff --git a/frappe/core/page/background_jobs/background_jobs.py b/frappe/core/page/background_jobs/background_jobs.py index 847b23bd3e..1f3555e351 100644 --- a/frappe/core/page/background_jobs/background_jobs.py +++ b/frappe/core/page/background_jobs/background_jobs.py @@ -4,12 +4,12 @@ import json from typing import TYPE_CHECKING, Dict, List -from rq import Queue, Worker +from rq import Worker import frappe from frappe import _ from frappe.utils import convert_utc_to_user_timezone, format_datetime -from frappe.utils.background_jobs import get_redis_conn +from frappe.utils.background_jobs import get_redis_conn, get_queues from frappe.utils.scheduler import is_scheduler_inactive if TYPE_CHECKING: @@ -29,7 +29,7 @@ def get_info(show_failed=False) -> List[Dict]: show_failed = json.loads(show_failed) conn = get_redis_conn() - queues = Queue.all(conn) + queues = get_queues() workers = Worker.all(conn) jobs = [] @@ -75,7 +75,7 @@ def get_info(show_failed=False) -> List[Dict]: @frappe.whitelist() def remove_failed_jobs(): conn = get_redis_conn() - queues = Queue.all(conn) + queues = get_queues() for queue in queues: fail_registry = queue.failed_job_registry for job_id in fail_registry.get_job_ids(): diff --git a/frappe/test_runner.py b/frappe/test_runner.py index 0c30fbbd00..8112362f34 100644 --- a/frappe/test_runner.py +++ b/frappe/test_runner.py @@ -56,6 +56,7 @@ def main(app=None, module=None, doctype=None, verbose=False, tests=(), frappe.clear_cache() frappe.utils.scheduler.disable_scheduler() set_test_email_config() + frappe.conf.update({'bench_id': 'test_bench', 'use_rq_auth': False}) if not frappe.flags.skip_before_tests: if verbose: diff --git a/frappe/tests/test_background_jobs.py b/frappe/tests/test_background_jobs.py index 88783f14f1..48e0dd2ee9 100644 --- a/frappe/tests/test_background_jobs.py +++ b/frappe/tests/test_background_jobs.py @@ -4,7 +4,7 @@ from rq import Queue import frappe from frappe.core.page.background_jobs.background_jobs import remove_failed_jobs -from frappe.utils.background_jobs import get_redis_conn +from frappe.utils.background_jobs import get_redis_conn, rename_queue import time @@ -17,14 +17,14 @@ class TestBackgroundJobs(unittest.TestCase): queues = Queue.all(conn) for queue in queues: - if queue.name == "short": + if queue.name == rename_queue("short"): fail_registry = queue.failed_job_registry self.assertGreater(fail_registry.count, 0) remove_failed_jobs() for queue in queues: - if queue.name == "short": + if queue.name == rename_queue("short"): fail_registry = queue.failed_job_registry self.assertEqual(fail_registry.count, 0) diff --git a/frappe/tests/test_redis.py b/frappe/tests/test_redis.py new file mode 100644 index 0000000000..72af1ac699 --- /dev/null +++ b/frappe/tests/test_redis.py @@ -0,0 +1,70 @@ +import unittest +import functools + +import redis + +import frappe +from frappe.utils import get_bench_id +from frappe.utils.rq import RedisQueue +from frappe.utils.background_jobs import get_redis_conn + +def version_tuple(version): + return tuple(map(int, (version.split(".")))) + +def skip_if_redis_version_lt(version): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + conn = get_redis_conn() + redis_version = conn.execute_command('info')['redis_version'] + if version_tuple(redis_version) < version_tuple(version): + return + return func(*args, **kwargs) + return wrapper + return decorator + +class TestRedisAuth(unittest.TestCase): + @skip_if_redis_version_lt('6.0') + def test_rq_gen_acllist(self): + """Make sure that ACL list is genrated + """ + acl_list = RedisQueue.gen_acl_list() + self.assertEqual(acl_list[1]['bench'][0], get_bench_id()) + + @skip_if_redis_version_lt('6.0') + def test_adding_redis_user(self): + acl_list = RedisQueue.gen_acl_list() + username, password = acl_list[1]['bench'] + conn = get_redis_conn() + + conn.acl_deluser(username) + _ = RedisQueue(conn).add_user(username, password) + self.assertTrue(conn.acl_getuser(username)) + conn.acl_deluser(username) + + @skip_if_redis_version_lt('6.0') + def test_rq_namespace(self): + """Make sure that user can access only their respective namespace. + """ + # Current bench ID + bench_id = frappe.conf.get('bench_id') + conn = get_redis_conn() + conn.set('rq:queue:test_bench1:abc', 'value') + conn.set(f'rq:queue:{bench_id}:abc', 'value') + + # Create new Redis Queue user + tmp_bench_id = 'test_bench1' + username, password = tmp_bench_id, 'password1' + conn.acl_deluser(username) + frappe.conf.update({'bench_id': tmp_bench_id}) + _ = RedisQueue(conn).add_user(username, password) + test_bench1_conn = RedisQueue.get_connection(username, password) + + self.assertEqual(test_bench1_conn.get('rq:queue:test_bench1:abc'), b'value') + + # User should not be able to access queues apart from their bench queues + with self.assertRaises(redis.exceptions.NoPermissionError): + test_bench1_conn.get(f'rq:queue:{bench_id}:abc') + + frappe.conf.update({'bench_id': bench_id}) + conn.acl_deluser(username) diff --git a/frappe/utils/__init__.py b/frappe/utils/__init__.py index bf139173d6..80c6cda98c 100644 --- a/frappe/utils/__init__.py +++ b/frappe/utils/__init__.py @@ -384,7 +384,7 @@ def get_bench_path(): return os.path.realpath(os.path.join(os.path.dirname(frappe.__file__), '..', '..', '..')) def get_bench_id(): - return frappe.local.conf.get('bench_id', 'DefaultBench') + return frappe.get_conf().get('bench_id', 'DefaultBench') def get_site_id(site=None): return f"{site or frappe.local.site}@{get_bench_id()}" diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 8456835ca7..4241c95c5d 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,13 +1,21 @@ +import os +import socket +import time +from uuid import uuid4 +from collections import defaultdict + + import redis +from typing import List from rq import Connection, Queue, Worker from rq.logutils import setup_loghandlers -from frappe.utils import cstr -from collections import defaultdict + import frappe -import os, socket, time from frappe import _ -from uuid import uuid4 import frappe.monitor +from frappe.utils import cstr, get_bench_id +from frappe.utils.rq import RedisQueue +from frappe.utils.commands import log default_timeout = 300 @@ -131,21 +139,22 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, if is_async: frappe.destroy() -def start_worker(queue=None, quiet = False): +def start_worker(queue=None, quiet = False, rq_username=None, rq_password=None): '''Wrapper to start rq worker. Connects to redis and monitors these queues.''' with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json - redis_connection = get_redis_conn() + redis_connection = get_redis_conn(username=rq_username, password=rq_password) + queues = get_queue_list(queue, build_queue_name=True) + queue_name = queue and rename_queue(queue) if os.environ.get('CI'): setup_loghandlers('ERROR') with Connection(redis_connection): - queues = get_queue_list(queue) logging_level = "INFO" if quiet: logging_level = "WARNING" - Worker(queues, name=get_worker_name(queue)).work(logging_level = logging_level) + Worker(queues, name=get_worker_name(queue_name)).work(logging_level = logging_level) def get_worker_name(queue): '''When limiting worker to a specific queue, also append queue name to default worker name''' @@ -186,7 +195,7 @@ def get_jobs(site=None, queue=None, key='method'): return jobs_per_site -def get_queue_list(queue_list=None): +def get_queue_list(queue_list=None, build_queue_name=False): '''Defines possible queues. Also wraps a given queue in a list after validating.''' default_queue_list = list(queue_timeout) if queue_list: @@ -195,11 +204,9 @@ def get_queue_list(queue_list=None): for queue in queue_list: validate_queue(queue, default_queue_list) - - return queue_list - else: - return default_queue_list + queue_list = default_queue_list + return [rename_queue(q) for q in queue_list] if build_queue_name else queue_list def get_workers(queue): '''Returns a list of Worker objects tied to a queue object''' @@ -218,7 +225,7 @@ def get_running_jobs_in_queue(queue): def get_queue(queue, is_async=True): '''Returns a Queue object tied to a redis connection''' validate_queue(queue) - return Queue(queue, connection=get_redis_conn(), is_async=is_async) + return Queue(rename_queue(queue), connection=get_redis_conn(), is_async=is_async) def validate_queue(queue, default_queue_list=None): if not default_queue_list: @@ -227,7 +234,7 @@ def validate_queue(queue, default_queue_list=None): if queue not in default_queue_list: frappe.throw(_("Queue should be one of {0}").format(', '.join(default_queue_list))) -def get_redis_conn(): +def get_redis_conn(username=None, password=None): if not hasattr(frappe.local, 'conf'): raise Exception('You need to call frappe.init') @@ -236,11 +243,50 @@ def get_redis_conn(): global redis_connection - if not redis_connection: - redis_connection = redis.from_url(frappe.local.conf.redis_queue) + cred = frappe._dict() + if frappe.conf.get('use_rq_auth'): + if username: + cred['username'] = username + cred['password'] = password + else: + cred['username'] = frappe.get_site_config().rq_username or get_bench_id() + cred['password'] = frappe.get_site_config().rq_password + + elif os.environ.get('RQ_ADMIN_PASWORD'): + cred['username'] = 'default' + cred['password'] = os.environ.get('RQ_ADMIN_PASWORD') + try: + redis_connection = RedisQueue.get_connection(**cred) + except (redis.exceptions.AuthenticationError, redis.exceptions.ResponseError): + log(f'Wrong credentials used for {cred.username or "default user"}. ' + 'You can reset credentials using `bench create-rq-users` CLI and restart the server', + colour='red') + raise + except Exception: + log(f'Please make sure that Redis Queue runs @ {frappe.get_conf().redis_queue}', colour='red') + raise return redis_connection +def get_queues() -> List[Queue]: + """Get all the queues linked to the current bench. + """ + queues = Queue.all(connection=get_redis_conn()) + return [q for q in queues if is_queue_accessible(q)] + +def rename_queue(qname: str) -> str: + """Rename qname by adding bench name as prefix. + + Renamed queues are useful to define namespaces of customers. + """ + return f"{get_bench_id()}:{qname}" + +def is_queue_accessible(qobj: Queue) -> bool: + """Checks whether queue is relate to current bench or not. + """ + accessible_queues = [rename_queue(q) for q in list(queue_timeout)] + return qobj.name in accessible_queues + def enqueue_test_job(): enqueue('frappe.utils.background_jobs.test_job', s=100) diff --git a/frappe/utils/rq.py b/frappe/utils/rq.py index 5e9b9dcd5d..b344b0caa5 100644 --- a/frappe/utils/rq.py +++ b/frappe/utils/rq.py @@ -1,8 +1,7 @@ import redis import frappe -from frappe.utils import get_site_id, get_bench_id, random_string - +from frappe.utils import get_bench_id, random_string class RedisQueue: def __init__(self, conn): @@ -17,9 +16,10 @@ class RedisQueue: return frappe._dict(user_settings) if is_created else {} @classmethod - def get_connection(cls, username='default', password=None): - domain = frappe.local.conf.redis_queue.split("redis://", 1)[-1] - url = f"redis://{username}:{password or ''}@{domain}" + def get_connection(cls, username=None, password=None): + rq_url = frappe.local.conf.redis_queue + domain = rq_url.split("redis://", 1)[-1] + url = (username and f"redis://{username}:{password or ''}@{domain}") or rq_url conn = redis.from_url(url) conn.ping() return conn @@ -63,25 +63,21 @@ class RedisQueue: return ['+@all', '-@admin'] @classmethod - def gen_acl_list(cls, reset_passwords=False, set_admin_password=False): + def gen_acl_list(cls, set_admin_password=False): """Generate list of ACL users needed for this branch. This list contains default ACL user and the bench ACL user(used by all sites incase of ACL is enabled). """ - with frappe.init_site(): - bench_username = get_bench_id() - bench_user_rules = cls.get_acl_key_rules(include_key_prefix=True) + cls.get_acl_command_rules() - + bench_username = get_bench_id() + bench_user_rules = cls.get_acl_key_rules(include_key_prefix=True) + cls.get_acl_command_rules() bench_user_rule_str = ' '.join(bench_user_rules).strip() bench_user_password = random_string(20) - bench_user_resetpass = (reset_passwords and 'resetpass') or '' default_username = 'default' _default_user_password = random_string(20) if set_admin_password else '' default_user_password = '>'+_default_user_password if _default_user_password else 'nopass' - default_user_resetpass = (reset_passwords and set_admin_password and 'resetpass') or '' return [ - f'user {default_username} on {default_user_password} {default_user_resetpass} ~* &* +@all', - f'user {bench_username} on >{bench_user_password} {bench_user_resetpass} {bench_user_rule_str}' + f'user {default_username} on {default_user_password} ~* &* +@all', + f'user {bench_username} on >{bench_user_password} {bench_user_rule_str}' ], {'bench': (bench_username, bench_user_password), 'default': (default_username, _default_user_password)}