From ef4ec51d4839ef6ec2d9160c82404606698e40b2 Mon Sep 17 00:00:00 2001 From: Anand Doshi Date: Tue, 12 Apr 2016 20:29:09 +0530 Subject: [PATCH] [fix] celery to rq --- bench/__init__.py | 2 +- bench/commands/update.py | 5 +- bench/commands/utils.py | 7 +-- bench/config/common_site_config.py | 41 +--------------- bench/config/nginx.py | 12 +++-- bench/config/procfile.py | 13 +++-- bench/config/supervisor.py | 15 ++++-- bench/config/templates/Procfile | 8 ++-- bench/config/templates/supervisor.conf | 61 ++++++++++++++---------- bench/patches/__init__.py | 29 +++++++++++ bench/patches/patches.txt | 2 + bench/patches/v3/__init__.py | 0 bench/patches/v3/celery_to_rq.py | 16 +++++++ bench/patches/v3/deprecate_old_config.py | 38 +++++++++++++++ bench/utils.py | 9 +++- 15 files changed, 169 insertions(+), 89 deletions(-) create mode 100644 bench/patches/__init__.py create mode 100644 bench/patches/patches.txt create mode 100644 bench/patches/v3/__init__.py create mode 100644 bench/patches/v3/celery_to_rq.py create mode 100644 bench/patches/v3/deprecate_old_config.py diff --git a/bench/__init__.py b/bench/__init__.py index 9eaedcc9..7bb36647 100644 --- a/bench/__init__.py +++ b/bench/__init__.py @@ -1,5 +1,5 @@ from jinja2 import Environment, PackageLoader -__version__ = "2.1.0" +__version__ = "3.0.0" env = Environment(loader=PackageLoader('bench.config'), trim_blocks=True) diff --git a/bench/commands/update.py b/bench/commands/update.py index 1b66236e..b570074c 100644 --- a/bench/commands/update.py +++ b/bench/commands/update.py @@ -1,9 +1,10 @@ import click import sys, os -from bench.config.common_site_config import get_config, deprecate_old_config +from bench.config.common_site_config import get_config from bench.app import pull_all_apps, is_version_upgrade from bench.utils import (update_bench, validate_upgrade, pre_upgrade, post_upgrade, before_update, update_requirements, backup_all_sites, patch_sites, build_assets, restart_supervisor_processes) +from bench import patches #TODO: Not DRY @click.command('update') @@ -23,7 +24,7 @@ def update(pull=False, patch=False, build=False, bench=False, auto=False, restar if not (pull or patch or build or bench or requirements): pull, patch, build, bench, requirements = True, True, True, True, True - deprecate_old_config(".") + patches.run(bench_path='.') conf = get_config(".") diff --git a/bench/commands/utils.py b/bench/commands/utils.py index e06a4b4b..a3f8bdbf 100644 --- a/bench/commands/utils.py +++ b/bench/commands/utils.py @@ -3,11 +3,12 @@ import sys, os, copy @click.command('start') -@click.option('--no-dev', is_flag=True) -def start(no_dev=False): +@click.option('--no-dev', is_flag=True, default=False) +@click.option('--concurrency', '-c', type=str) +def start(no_dev, concurrency): "Start Frappe development processes" from bench.utils import start - start(no_dev=no_dev) + start(no_dev=no_dev, concurrency=concurrency) @click.command('restart') diff --git a/bench/config/common_site_config.py b/bench/config/common_site_config.py index 5e2b14b2..588cf3ad 100644 --- a/bench/config/common_site_config.py +++ b/bench/config/common_site_config.py @@ -1,8 +1,4 @@ -import os -import multiprocessing -import getpass -import json -import urlparse +import os, multiprocessing, getpass, json, urlparse default_config = { 'restart_supervisor_on_update': False, @@ -12,6 +8,7 @@ default_config = { 'update_bench_on_update': True, 'frappe_user': getpass.getuser(), 'shallow_clone': True, + 'background_workers': 1 } def make_config(bench_path): @@ -109,37 +106,3 @@ def make_pid_folder(bench_path): pids_path = os.path.join(bench_path, 'config', 'pids') if not os.path.exists(pids_path): os.makedirs(pids_path) - -def deprecate_old_config(bench_path): - # deprecate bench config - bench_config_path = os.path.join(bench_path, 'config.json') - if os.path.exists(bench_config_path): - with open(bench_config_path, "r") as f: - bench_config = json.loads(f.read()) - - common_site_config = get_common_site_config(bench_path) - common_site_config.update(bench_config) - put_config(common_site_config, bench_path) - - # remove bench/config.json - os.remove(bench_config_path) - - # change keys - config = get_config(bench_path) - changed = False - for from_key, to_key, default in ( - ("celery_broker", "redis_queue", "redis://localhost:6379"), - ("async_redis_server", "redis_socketio", "redis://localhost:12311"), - ("cache_redis_server", "redis_cache", "redis://localhost:11311") - ): - if from_key in config: - config[to_key] = config[from_key] - del config[from_key] - changed = True - - elif to_key not in config: - config[to_key] = default - changed = True - - if changed: - put_config(config, bench_path) diff --git a/bench/config/nginx.py b/bench/config/nginx.py index 550d6e59..2a926c4d 100644 --- a/bench/config/nginx.py +++ b/bench/config/nginx.py @@ -1,8 +1,7 @@ -import os -import json +import os, json, click from bench.utils import get_sites, get_bench_name -def make_nginx_conf(bench_path): +def make_nginx_conf(bench_path, force=False): from bench import env from bench.config.common_site_config import get_config @@ -22,7 +21,12 @@ def make_nginx_conf(bench_path): "bench_name": get_bench_name(bench_path) }) - with open(os.path.join(bench_path, "config", "nginx.conf"), "w") as f: + conf_path = os.path.join(bench_path, "config", "nginx.conf") + if not force and os.path.exists(conf_path): + click.confirm('nginx.conf already exists and this will overwrite it. Do you want to continue?', + abort=True) + + with open(conf_path, "w") as f: f.write(nginx_conf) def prepare_sites(config, bench_path): diff --git a/bench/config/procfile.py b/bench/config/procfile.py index ec5c0141..8fea1849 100644 --- a/bench/config/procfile.py +++ b/bench/config/procfile.py @@ -1,9 +1,14 @@ -import bench, os +import bench, os, click from bench.utils import find_executable -def setup_procfile(bench_path): +def setup_procfile(bench_path, force=False): + procfile_path = os.path.join(bench_path, 'Procfile') + if not force and os.path.exists(procfile_path): + click.confirm('A Procfile already exists and this will overwrite it. Do you want to continue?', + abort=True) + procfile = bench.env.get_template('Procfile').render(node=find_executable("node") \ or find_executable("nodejs")) - - with open(os.path.join(bench_path, 'Procfile'), 'w') as f: + + with open(procfile_path, 'w') as f: f.write(procfile) diff --git a/bench/config/supervisor.py b/bench/config/supervisor.py index a7564e24..28339341 100644 --- a/bench/config/supervisor.py +++ b/bench/config/supervisor.py @@ -1,6 +1,7 @@ -import os, getpass, bench +import os, getpass, click +import bench -def generate_supervisor_config(bench_path, user=None): +def generate_supervisor_config(bench_path, user=None, force=False): from bench.app import get_current_frappe_version from bench.utils import get_bench_name, find_executable from bench.config.common_site_config import get_config, update_config, get_gunicorn_workers @@ -26,10 +27,16 @@ def generate_supervisor_config(bench_path, user=None): "frappe_version": get_current_frappe_version(), "webserver_port": config.get('webserver_port', 8000), "gunicorn_workers": config.get('gunicorn_workers', get_gunicorn_workers()["gunicorn_workers"]), - "bench_name": get_bench_name(bench_path) + "bench_name": get_bench_name(bench_path), + "background_workers": config.get('background_workers') or 1 }) - with open(os.path.join(bench_path, 'config', 'supervisor.conf'), 'w') as f: + conf_path = os.path.join(bench_path, 'config', 'supervisor.conf') + if not force and os.path.exists(conf_path): + click.confirm('supervisor.conf already exists and this will overwrite it. Do you want to continue?', + abort=True) + + with open(conf_path, 'w') as f: f.write(config) update_config({'restart_supervisor_on_update': True}, bench=bench_path) diff --git a/bench/config/templates/Procfile b/bench/config/templates/Procfile index e420e0dd..4d73ee41 100644 --- a/bench/config/templates/Procfile +++ b/bench/config/templates/Procfile @@ -3,8 +3,8 @@ redis_socketio: redis-server config/redis_socketio.conf redis_queue: redis-server config/redis_queue.conf web: bench serve socketio: {{ node }} apps/frappe/socketio.js -workerbeat: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app beat -s scheduler.schedule' -worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n jobs@%h -Ofair --soft-time-limit 360 --time-limit 390' -longjob_worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n longjobs@%h -Ofair --soft-time-limit 1500 --time-limit 1530' -async_worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n async@%h -Ofair --soft-time-limit 1500 --time-limit 1530' watch: bench watch +schedule: bench schedule +worker_short: bench worker --queue short +worker_long: bench worker --queue long +worker_default: bench worker --queue default diff --git a/bench/config/templates/supervisor.conf b/bench/config/templates/supervisor.conf index f97441da..3ca4a6de 100644 --- a/bench/config/templates/supervisor.conf +++ b/bench/config/templates/supervisor.conf @@ -12,51 +12,57 @@ stderr_logfile={{ bench_dir }}/logs/web.error.log user={{ user }} directory={{ sites_dir }} -[program:{{ bench_name }}-frappe-worker] -command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n jobs@%%h -Ofair --soft-time-limit 360 --time-limit 390 --loglevel INFO +[program:{{ bench_name }}-frappe-schedule] +command=bench schedule +priority=3 +autostart=true +autorestart=true +stdout_logfile={{ bench_dir }}/logs/schedule.log +stderr_logfile={{ bench_dir }}/logs/schedule.error.log +user={{ user }} +directory={{ bench_dir }} + +[program:{{ bench_name }}-frappe-default-worker] +command=bench worker --queue default priority=4 autostart=true autorestart=true stdout_logfile={{ bench_dir }}/logs/worker.log stderr_logfile={{ bench_dir }}/logs/worker.error.log user={{ user }} -stopwaitsecs=400 -directory={{ sites_dir }} +stopwaitsecs=1560 +directory={{ bench_dir }} killasgroup=true +numprocs={{ background_workers }} +process_name=%(program_name)s-%(process_num)d -[program:{{ bench_name }}-frappe-longjob-worker] -command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n longjobs@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO -priority=2 +[program:{{ bench_name }}-frappe-short-worker] +command=bench worker --queue short +priority=4 autostart=true autorestart=true stdout_logfile={{ bench_dir }}/logs/worker.log stderr_logfile={{ bench_dir }}/logs/worker.error.log user={{ user }} -stopwaitsecs=1540 -directory={{ sites_dir }} +stopwaitsecs=360 +directory={{ bench_dir }} killasgroup=true +numprocs={{ background_workers }} +process_name=%(program_name)s-%(process_num)d -[program:{{ bench_name }}-frappe-async-worker] -command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n async@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO -priority=2 +[program:{{ bench_name }}-frappe-long-worker] +command=bench worker --queue long +priority=4 autostart=true autorestart=true stdout_logfile={{ bench_dir }}/logs/worker.log stderr_logfile={{ bench_dir }}/logs/worker.error.log user={{ user }} -stopwaitsecs=1540 -directory={{ sites_dir }} +stopwaitsecs=1560 +directory={{ bench_dir }} killasgroup=true - -[program:{{ bench_name }}-frappe-workerbeat] -command={{ bench_dir }}/env/bin/python -m frappe.celery_app beat -s beat.schedule -priority=3 -autostart=true -autorestart=true -stdout_logfile={{ bench_dir }}/logs/workerbeat.log -stderr_logfile={{ bench_dir }}/logs/workerbeat.error.log -user={{ user }} -directory={{ sites_dir }} +numprocs={{ background_workers }} +process_name=%(program_name)s-%(process_num)d [program:{{ bench_name }}-redis-cache] command={{ redis_server }} {{ redis_cache_config }} @@ -103,8 +109,11 @@ directory={{ bench_dir }} {% endif %} -[group:{{ bench_name }}-processes] -programs={{ bench_name }}-frappe-web,{{ bench_name }}-frappe-worker,{{ bench_name }}-frappe-longjob-worker,{{ bench_name }}-frappe-async-worker,{{ bench_name }}-frappe-workerbeat {%- if node -%} ,{{ bench_name }}-node-socketio {%- endif%} +[group:{{ bench_name }}-web] +programs={{ bench_name }}-frappe-web {%- if node -%} ,{{ bench_name }}-node-socketio {%- endif%} + +[group:{{ bench_name }}-workers] +programs={{ bench_name }}-frappe-schedule,{{ bench_name }}-frappe-default-worker,{{ bench_name }}-frappe-short-worker,{{ bench_name }}-frappe-long-worker [group:{{ bench_name }}-redis] programs={{ bench_name }}-redis-cache,{{ bench_name }}-redis-queue {%- if frappe_version > 5 -%} ,{{ bench_name }}-redis-socketio {%- endif %} diff --git a/bench/patches/__init__.py b/bench/patches/__init__.py new file mode 100644 index 00000000..61013f5b --- /dev/null +++ b/bench/patches/__init__.py @@ -0,0 +1,29 @@ +import os, importlib + +def run(bench_path): + source_patch_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'patches.txt') + target_patch_file = os.path.join(os.path.abspath(bench_path), 'patches.txt') + + with open(source_patch_file, 'r') as f: + patches = [p.strip() for p in f.read().splitlines() + if p.strip() and not p.strip().startswith("#")] + + executed_patches = [] + if os.path.exists(target_patch_file): + with open(target_patch_file, 'r') as f: + executed_patches = f.read().splitlines() + + try: + for patch in patches: + if patch not in executed_patches: + module = importlib.import_module(patch.split()[0]) + execute = getattr(module, 'execute') + execute(bench_path) + executed_patches.append(patch) + + finally: + with open(target_patch_file, 'w') as f: + f.write('\n'.join(executed_patches)) + + # end with an empty line + f.write('\n') diff --git a/bench/patches/patches.txt b/bench/patches/patches.txt new file mode 100644 index 00000000..138862fa --- /dev/null +++ b/bench/patches/patches.txt @@ -0,0 +1,2 @@ +bench.patches.v3.deprecate_old_config +bench.patches.v3.celery_to_rq diff --git a/bench/patches/v3/__init__.py b/bench/patches/v3/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bench/patches/v3/celery_to_rq.py b/bench/patches/v3/celery_to_rq.py new file mode 100644 index 00000000..94472995 --- /dev/null +++ b/bench/patches/v3/celery_to_rq.py @@ -0,0 +1,16 @@ +import click, os +from bench.config.procfile import setup_procfile +from bench.config.supervisor import generate_supervisor_config + +def execute(bench_path): + click.confirm('\nThis update will remove Celery config and prepare the bench to use Python RQ.\n' + 'And it will overwrite Procfile and supervisor.conf.\n' + 'If you don\'t know what this means, type Y ;)\n\n' + 'Do you want to continue?', + abort=True) + + setup_procfile(bench_path, force=True) + + # if production setup + if os.path.exists(os.path.join(bench_path, 'config', 'supervisor.conf')): + generate_supervisor_config(bench_path, force=True) diff --git a/bench/patches/v3/deprecate_old_config.py b/bench/patches/v3/deprecate_old_config.py new file mode 100644 index 00000000..4b86d0d2 --- /dev/null +++ b/bench/patches/v3/deprecate_old_config.py @@ -0,0 +1,38 @@ +import os, json +from bench.config.common_site_config import get_config, put_config, get_common_site_config + +def execute(bench_path): + # deprecate bench config + bench_config_path = os.path.join(bench_path, 'config.json') + if not os.path.exists(bench_config_path): + return + + with open(bench_config_path, "r") as f: + bench_config = json.loads(f.read()) + + common_site_config = get_common_site_config(bench_path) + common_site_config.update(bench_config) + put_config(common_site_config, bench_path) + + # remove bench/config.json + os.remove(bench_config_path) + + # change keys + config = get_config(bench_path) + changed = False + for from_key, to_key, default in ( + ("celery_broker", "redis_queue", "redis://localhost:6379"), + ("async_redis_server", "redis_socketio", "redis://localhost:12311"), + ("cache_redis_server", "redis_cache", "redis://localhost:11311") + ): + if from_key in config: + config[to_key] = config[from_key] + del config[from_key] + changed = True + + elif to_key not in config: + config[to_key] = default + changed = True + + if changed: + put_config(config, bench_path) diff --git a/bench/utils.py b/bench/utils.py index 72401b12..97fb7a29 100644 --- a/bench/utils.py +++ b/bench/utils.py @@ -221,14 +221,19 @@ def get_program(programs): def get_process_manager(): return get_program(['foreman', 'forego', 'honcho']) -def start(no_dev=False): +def start(no_dev=False, concurrency=None): program = get_process_manager() if not program: raise Exception("No process manager found") os.environ['PYTHONUNBUFFERED'] = "true" if not no_dev: os.environ['DEV_SERVER'] = "true" - os.execv(program, [program, 'start']) + + command = [program, 'start'] + if concurrency: + command.extend(['-c', concurrency]) + + os.execv(program, command) def check_cmd(cmd, cwd='.'): try: