diff --git a/frappe/commands/__init__.py b/frappe/commands/__init__.py index be9d107025..9ed333d034 100644 --- a/frappe/commands/__init__.py +++ b/frappe/commands/__init__.py @@ -102,7 +102,9 @@ def get_commands(): from .site import commands as site_commands from .translate import commands as translate_commands from .utils import commands as utils_commands + from .redis import commands as redis_commands - return list(set(scheduler_commands + site_commands + translate_commands + utils_commands)) + all_commands = scheduler_commands + site_commands + translate_commands + utils_commands + redis_commands + return list(set(all_commands)) commands = get_commands() diff --git a/frappe/commands/redis.py b/frappe/commands/redis.py new file mode 100644 index 0000000000..38a46c2142 --- /dev/null +++ b/frappe/commands/redis.py @@ -0,0 +1,53 @@ +import os + +import click + +import frappe +from frappe.utils.rq import RedisQueue +from frappe.installer import update_site_config + +@click.command('create-rq-users') +@click.option('--set-admin-password', is_flag=True, default=False, help='Set new Redis admin(default user) password') +@click.option('--use-rq-auth', is_flag=True, default=False, help='Enable Redis authentication for sites') +def create_rq_users(set_admin_password=False, use_rq_auth=False): + """Create Redis Queue users and add to acl and app configs. + + acl config file will be used by redis server while starting the server + and app config is used by app while connecting to redis server. + """ + acl_file_path = os.path.abspath('../config/redis_queue.acl') + + with frappe.init_site(): + acl_list, user_credentials = RedisQueue.gen_acl_list( + set_admin_password=set_admin_password) + + with open(acl_file_path, 'w') as f: + f.writelines([acl+'\n' for acl in acl_list]) + + sites_path = os.getcwd() + common_site_config_path = os.path.join(sites_path, 'common_site_config.json') + update_site_config("rq_username", user_credentials['bench'][0], validate=False, + site_config_path=common_site_config_path) + update_site_config("rq_password", user_credentials['bench'][1], validate=False, + site_config_path=common_site_config_path) + update_site_config("use_rq_auth", use_rq_auth, validate=False, + site_config_path=common_site_config_path) + + click.secho('* ACL and site configs are updated with new user credentials. ' + 'Please restart Redis Queue server to enable namespaces.', + fg='green') + + if set_admin_password: + env_key = 'RQ_ADMIN_PASWORD' + click.secho('* Redis admin password is successfully set up. ' + 'Include below line in .bashrc file for system to use', + fg='green') + click.secho(f"`export {env_key}={user_credentials['default'][1]}`") + click.secho('NOTE: Please save the admin password as you ' + 'can not access redis server without the password', + fg='yellow') + + +commands = [ + create_rq_users +] diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index d69ebb3024..f82473fd55 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -172,9 +172,13 @@ def start_scheduler(): @click.command('worker') @click.option('--queue', type=str) @click.option('--quiet', is_flag = True, default = False, help = 'Hide Log Outputs') -def start_worker(queue, quiet = False): +@click.option('-u', '--rq-username', default=None, help='Redis ACL user') +@click.option('-p', '--rq-password', default=None, help='Redis ACL user password') +def start_worker(queue, quiet = False, rq_username=None, rq_password=None): + """Site is used to find redis credentals. + """ from frappe.utils.background_jobs import start_worker - start_worker(queue, quiet = quiet) + start_worker(queue, quiet = quiet, rq_username=rq_username, rq_password=rq_password) @click.command('ready-for-migration') @click.option('--site', help='site name') diff --git a/frappe/core/page/background_jobs/background_jobs.py b/frappe/core/page/background_jobs/background_jobs.py index 847b23bd3e..1f3555e351 100644 --- a/frappe/core/page/background_jobs/background_jobs.py +++ b/frappe/core/page/background_jobs/background_jobs.py @@ -4,12 +4,12 @@ import json from typing import TYPE_CHECKING, Dict, List -from rq import Queue, Worker +from rq import Worker import frappe from frappe import _ from frappe.utils import convert_utc_to_user_timezone, format_datetime -from frappe.utils.background_jobs import get_redis_conn +from frappe.utils.background_jobs import get_redis_conn, get_queues from frappe.utils.scheduler import is_scheduler_inactive if TYPE_CHECKING: @@ -29,7 +29,7 @@ def get_info(show_failed=False) -> List[Dict]: show_failed = json.loads(show_failed) conn = get_redis_conn() - queues = Queue.all(conn) + queues = get_queues() workers = Worker.all(conn) jobs = [] @@ -75,7 +75,7 @@ def get_info(show_failed=False) -> List[Dict]: @frappe.whitelist() def remove_failed_jobs(): conn = get_redis_conn() - queues = Queue.all(conn) + queues = get_queues() for queue in queues: fail_registry = queue.failed_job_registry for job_id in fail_registry.get_job_ids(): diff --git a/frappe/test_runner.py b/frappe/test_runner.py index 0c30fbbd00..8112362f34 100644 --- a/frappe/test_runner.py +++ b/frappe/test_runner.py @@ -56,6 +56,7 @@ def main(app=None, module=None, doctype=None, verbose=False, tests=(), frappe.clear_cache() frappe.utils.scheduler.disable_scheduler() set_test_email_config() + frappe.conf.update({'bench_id': 'test_bench', 'use_rq_auth': False}) if not frappe.flags.skip_before_tests: if verbose: diff --git a/frappe/tests/test_background_jobs.py b/frappe/tests/test_background_jobs.py index 88783f14f1..188f3e166f 100644 --- a/frappe/tests/test_background_jobs.py +++ b/frappe/tests/test_background_jobs.py @@ -4,7 +4,7 @@ from rq import Queue import frappe from frappe.core.page.background_jobs.background_jobs import remove_failed_jobs -from frappe.utils.background_jobs import get_redis_conn +from frappe.utils.background_jobs import get_redis_conn, generate_qname import time @@ -17,14 +17,14 @@ class TestBackgroundJobs(unittest.TestCase): queues = Queue.all(conn) for queue in queues: - if queue.name == "short": + if queue.name == generate_qname("short"): fail_registry = queue.failed_job_registry self.assertGreater(fail_registry.count, 0) remove_failed_jobs() for queue in queues: - if queue.name == "short": + if queue.name == generate_qname("short"): fail_registry = queue.failed_job_registry self.assertEqual(fail_registry.count, 0) diff --git a/frappe/tests/test_redis.py b/frappe/tests/test_redis.py new file mode 100644 index 0000000000..72af1ac699 --- /dev/null +++ b/frappe/tests/test_redis.py @@ -0,0 +1,70 @@ +import unittest +import functools + +import redis + +import frappe +from frappe.utils import get_bench_id +from frappe.utils.rq import RedisQueue +from frappe.utils.background_jobs import get_redis_conn + +def version_tuple(version): + return tuple(map(int, (version.split(".")))) + +def skip_if_redis_version_lt(version): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + conn = get_redis_conn() + redis_version = conn.execute_command('info')['redis_version'] + if version_tuple(redis_version) < version_tuple(version): + return + return func(*args, **kwargs) + return wrapper + return decorator + +class TestRedisAuth(unittest.TestCase): + @skip_if_redis_version_lt('6.0') + def test_rq_gen_acllist(self): + """Make sure that ACL list is genrated + """ + acl_list = RedisQueue.gen_acl_list() + self.assertEqual(acl_list[1]['bench'][0], get_bench_id()) + + @skip_if_redis_version_lt('6.0') + def test_adding_redis_user(self): + acl_list = RedisQueue.gen_acl_list() + username, password = acl_list[1]['bench'] + conn = get_redis_conn() + + conn.acl_deluser(username) + _ = RedisQueue(conn).add_user(username, password) + self.assertTrue(conn.acl_getuser(username)) + conn.acl_deluser(username) + + @skip_if_redis_version_lt('6.0') + def test_rq_namespace(self): + """Make sure that user can access only their respective namespace. + """ + # Current bench ID + bench_id = frappe.conf.get('bench_id') + conn = get_redis_conn() + conn.set('rq:queue:test_bench1:abc', 'value') + conn.set(f'rq:queue:{bench_id}:abc', 'value') + + # Create new Redis Queue user + tmp_bench_id = 'test_bench1' + username, password = tmp_bench_id, 'password1' + conn.acl_deluser(username) + frappe.conf.update({'bench_id': tmp_bench_id}) + _ = RedisQueue(conn).add_user(username, password) + test_bench1_conn = RedisQueue.get_connection(username, password) + + self.assertEqual(test_bench1_conn.get('rq:queue:test_bench1:abc'), b'value') + + # User should not be able to access queues apart from their bench queues + with self.assertRaises(redis.exceptions.NoPermissionError): + test_bench1_conn.get(f'rq:queue:{bench_id}:abc') + + frappe.conf.update({'bench_id': bench_id}) + conn.acl_deluser(username) diff --git a/frappe/utils/__init__.py b/frappe/utils/__init__.py index af9d5de1ee..68366eb234 100644 --- a/frappe/utils/__init__.py +++ b/frappe/utils/__init__.py @@ -383,6 +383,12 @@ def get_files_path(*path, **kwargs): def get_bench_path(): return os.path.realpath(os.path.join(os.path.dirname(frappe.__file__), '..', '..', '..')) +def get_bench_id(): + return frappe.get_conf().get('bench_id', get_bench_path().strip('/').replace('/', '-')) + +def get_site_id(site=None): + return f"{site or frappe.local.site}@{get_bench_id()}" + def get_backups_path(): return get_site_path("private", "backups") diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 8456835ca7..f0bd06aff4 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,13 +1,21 @@ +import os +import socket +import time +from uuid import uuid4 +from collections import defaultdict + + import redis +from typing import List from rq import Connection, Queue, Worker from rq.logutils import setup_loghandlers -from frappe.utils import cstr -from collections import defaultdict + import frappe -import os, socket, time from frappe import _ -from uuid import uuid4 import frappe.monitor +from frappe.utils import cstr, get_bench_id +from frappe.utils.rq import RedisQueue +from frappe.utils.commands import log default_timeout = 300 @@ -131,21 +139,22 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, if is_async: frappe.destroy() -def start_worker(queue=None, quiet = False): +def start_worker(queue=None, quiet = False, rq_username=None, rq_password=None): '''Wrapper to start rq worker. Connects to redis and monitors these queues.''' with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json - redis_connection = get_redis_conn() + redis_connection = get_redis_conn(username=rq_username, password=rq_password) + queues = get_queue_list(queue, build_queue_name=True) + queue_name = queue and generate_qname(queue) if os.environ.get('CI'): setup_loghandlers('ERROR') with Connection(redis_connection): - queues = get_queue_list(queue) logging_level = "INFO" if quiet: logging_level = "WARNING" - Worker(queues, name=get_worker_name(queue)).work(logging_level = logging_level) + Worker(queues, name=get_worker_name(queue_name)).work(logging_level = logging_level) def get_worker_name(queue): '''When limiting worker to a specific queue, also append queue name to default worker name''' @@ -186,7 +195,7 @@ def get_jobs(site=None, queue=None, key='method'): return jobs_per_site -def get_queue_list(queue_list=None): +def get_queue_list(queue_list=None, build_queue_name=False): '''Defines possible queues. Also wraps a given queue in a list after validating.''' default_queue_list = list(queue_timeout) if queue_list: @@ -195,11 +204,9 @@ def get_queue_list(queue_list=None): for queue in queue_list: validate_queue(queue, default_queue_list) - - return queue_list - else: - return default_queue_list + queue_list = default_queue_list + return [generate_qname(qtype) for qtype in queue_list] if build_queue_name else queue_list def get_workers(queue): '''Returns a list of Worker objects tied to a queue object''' @@ -215,10 +222,10 @@ def get_running_jobs_in_queue(queue): jobs.append(current_job) return jobs -def get_queue(queue, is_async=True): +def get_queue(qtype, is_async=True): '''Returns a Queue object tied to a redis connection''' - validate_queue(queue) - return Queue(queue, connection=get_redis_conn(), is_async=is_async) + validate_queue(qtype) + return Queue(generate_qname(qtype), connection=get_redis_conn(), is_async=is_async) def validate_queue(queue, default_queue_list=None): if not default_queue_list: @@ -227,7 +234,7 @@ def validate_queue(queue, default_queue_list=None): if queue not in default_queue_list: frappe.throw(_("Queue should be one of {0}").format(', '.join(default_queue_list))) -def get_redis_conn(): +def get_redis_conn(username=None, password=None): if not hasattr(frappe.local, 'conf'): raise Exception('You need to call frappe.init') @@ -236,11 +243,50 @@ def get_redis_conn(): global redis_connection - if not redis_connection: - redis_connection = redis.from_url(frappe.local.conf.redis_queue) + cred = frappe._dict() + if frappe.conf.get('use_rq_auth'): + if username: + cred['username'] = username + cred['password'] = password + else: + cred['username'] = frappe.get_site_config().rq_username or get_bench_id() + cred['password'] = frappe.get_site_config().rq_password + + elif os.environ.get('RQ_ADMIN_PASWORD'): + cred['username'] = 'default' + cred['password'] = os.environ.get('RQ_ADMIN_PASWORD') + try: + redis_connection = RedisQueue.get_connection(**cred) + except (redis.exceptions.AuthenticationError, redis.exceptions.ResponseError): + log(f'Wrong credentials used for {cred.username or "default user"}. ' + 'You can reset credentials using `bench create-rq-users` CLI and restart the server', + colour='red') + raise + except Exception: + log(f'Please make sure that Redis Queue runs @ {frappe.get_conf().redis_queue}', colour='red') + raise return redis_connection +def get_queues() -> List[Queue]: + """Get all the queues linked to the current bench. + """ + queues = Queue.all(connection=get_redis_conn()) + return [q for q in queues if is_queue_accessible(q)] + +def generate_qname(qtype: str) -> str: + """Generate qname by combining bench ID and queue type. + + qnames are useful to define namespaces of customers. + """ + return f"{get_bench_id()}:{qtype}" + +def is_queue_accessible(qobj: Queue) -> bool: + """Checks whether queue is relate to current bench or not. + """ + accessible_queues = [generate_qname(q) for q in list(queue_timeout)] + return qobj.name in accessible_queues + def enqueue_test_job(): enqueue('frappe.utils.background_jobs.test_job', s=100) diff --git a/frappe/utils/rq.py b/frappe/utils/rq.py new file mode 100644 index 0000000000..b344b0caa5 --- /dev/null +++ b/frappe/utils/rq.py @@ -0,0 +1,83 @@ +import redis + +import frappe +from frappe.utils import get_bench_id, random_string + +class RedisQueue: + def __init__(self, conn): + self.conn = conn + + def add_user(self, username, password=None): + """Create or update the user. + """ + password = password or self.conn.acl_genpass() + user_settings = self.get_new_user_settings(username, password) + is_created = self.conn.acl_setuser(**user_settings) + return frappe._dict(user_settings) if is_created else {} + + @classmethod + def get_connection(cls, username=None, password=None): + rq_url = frappe.local.conf.redis_queue + domain = rq_url.split("redis://", 1)[-1] + url = (username and f"redis://{username}:{password or ''}@{domain}") or rq_url + conn = redis.from_url(url) + conn.ping() + return conn + + @classmethod + def new(cls, username='default', password=None): + return cls(cls.get_connection(username, password)) + + @classmethod + def set_admin_password(cls, cur_password=None, new_password=None, reset_passwords=False): + username = 'default' + conn = cls.get_connection(username, cur_password) + password = '+'+(new_password or conn.acl_genpass()) + conn.acl_setuser( + username=username, enabled=True, reset_passwords=reset_passwords, passwords=password + ) + return password[1:] + + @classmethod + def get_new_user_settings(cls, username, password): + d = {} + d['username'] = username + d['passwords'] = '+'+password + d['reset_keys'] = True + d['enabled'] = True + d['keys'] = cls.get_acl_key_rules() + d['commands'] = cls.get_acl_command_rules() + return d + + @classmethod + def get_acl_key_rules(cls, include_key_prefix=False): + """FIXME: Find better way + """ + rules = ['rq:[^q]*', 'rq:queues', f'rq:queue:{get_bench_id()}:*'] + if include_key_prefix: + return ['~'+pattern for pattern in rules] + return rules + + @classmethod + def get_acl_command_rules(cls): + return ['+@all', '-@admin'] + + @classmethod + def gen_acl_list(cls, set_admin_password=False): + """Generate list of ACL users needed for this branch. + + This list contains default ACL user and the bench ACL user(used by all sites incase of ACL is enabled). + """ + bench_username = get_bench_id() + bench_user_rules = cls.get_acl_key_rules(include_key_prefix=True) + cls.get_acl_command_rules() + bench_user_rule_str = ' '.join(bench_user_rules).strip() + bench_user_password = random_string(20) + + default_username = 'default' + _default_user_password = random_string(20) if set_admin_password else '' + default_user_password = '>'+_default_user_password if _default_user_password else 'nopass' + + return [ + f'user {default_username} on {default_user_password} ~* &* +@all', + f'user {bench_username} on >{bench_user_password} {bench_user_rule_str}' + ], {'bench': (bench_username, bench_user_password), 'default': (default_username, _default_user_password)}