2
0
mirror of https://github.com/frappe/bench.git synced 2024-09-23 20:49:01 +00:00

[fix] celery to rq

This commit is contained in:
Anand Doshi 2016-04-12 20:29:09 +05:30
parent 4f93b1b80c
commit ef4ec51d48
15 changed files with 169 additions and 89 deletions

View File

@ -1,5 +1,5 @@
from jinja2 import Environment, PackageLoader from jinja2 import Environment, PackageLoader
__version__ = "2.1.0" __version__ = "3.0.0"
env = Environment(loader=PackageLoader('bench.config'), trim_blocks=True) env = Environment(loader=PackageLoader('bench.config'), trim_blocks=True)

View File

@ -1,9 +1,10 @@
import click import click
import sys, os import sys, os
from bench.config.common_site_config import get_config, deprecate_old_config from bench.config.common_site_config import get_config
from bench.app import pull_all_apps, is_version_upgrade from bench.app import pull_all_apps, is_version_upgrade
from bench.utils import (update_bench, validate_upgrade, pre_upgrade, post_upgrade, before_update, from bench.utils import (update_bench, validate_upgrade, pre_upgrade, post_upgrade, before_update,
update_requirements, backup_all_sites, patch_sites, build_assets, restart_supervisor_processes) update_requirements, backup_all_sites, patch_sites, build_assets, restart_supervisor_processes)
from bench import patches
#TODO: Not DRY #TODO: Not DRY
@click.command('update') @click.command('update')
@ -23,7 +24,7 @@ def update(pull=False, patch=False, build=False, bench=False, auto=False, restar
if not (pull or patch or build or bench or requirements): if not (pull or patch or build or bench or requirements):
pull, patch, build, bench, requirements = True, True, True, True, True pull, patch, build, bench, requirements = True, True, True, True, True
deprecate_old_config(".") patches.run(bench_path='.')
conf = get_config(".") conf = get_config(".")

View File

@ -3,11 +3,12 @@ import sys, os, copy
@click.command('start') @click.command('start')
@click.option('--no-dev', is_flag=True) @click.option('--no-dev', is_flag=True, default=False)
def start(no_dev=False): @click.option('--concurrency', '-c', type=str)
def start(no_dev, concurrency):
"Start Frappe development processes" "Start Frappe development processes"
from bench.utils import start from bench.utils import start
start(no_dev=no_dev) start(no_dev=no_dev, concurrency=concurrency)
@click.command('restart') @click.command('restart')

View File

@ -1,8 +1,4 @@
import os import os, multiprocessing, getpass, json, urlparse
import multiprocessing
import getpass
import json
import urlparse
default_config = { default_config = {
'restart_supervisor_on_update': False, 'restart_supervisor_on_update': False,
@ -12,6 +8,7 @@ default_config = {
'update_bench_on_update': True, 'update_bench_on_update': True,
'frappe_user': getpass.getuser(), 'frappe_user': getpass.getuser(),
'shallow_clone': True, 'shallow_clone': True,
'background_workers': 1
} }
def make_config(bench_path): def make_config(bench_path):
@ -109,37 +106,3 @@ def make_pid_folder(bench_path):
pids_path = os.path.join(bench_path, 'config', 'pids') pids_path = os.path.join(bench_path, 'config', 'pids')
if not os.path.exists(pids_path): if not os.path.exists(pids_path):
os.makedirs(pids_path) os.makedirs(pids_path)
def deprecate_old_config(bench_path):
# deprecate bench config
bench_config_path = os.path.join(bench_path, 'config.json')
if os.path.exists(bench_config_path):
with open(bench_config_path, "r") as f:
bench_config = json.loads(f.read())
common_site_config = get_common_site_config(bench_path)
common_site_config.update(bench_config)
put_config(common_site_config, bench_path)
# remove bench/config.json
os.remove(bench_config_path)
# change keys
config = get_config(bench_path)
changed = False
for from_key, to_key, default in (
("celery_broker", "redis_queue", "redis://localhost:6379"),
("async_redis_server", "redis_socketio", "redis://localhost:12311"),
("cache_redis_server", "redis_cache", "redis://localhost:11311")
):
if from_key in config:
config[to_key] = config[from_key]
del config[from_key]
changed = True
elif to_key not in config:
config[to_key] = default
changed = True
if changed:
put_config(config, bench_path)

View File

@ -1,8 +1,7 @@
import os import os, json, click
import json
from bench.utils import get_sites, get_bench_name from bench.utils import get_sites, get_bench_name
def make_nginx_conf(bench_path): def make_nginx_conf(bench_path, force=False):
from bench import env from bench import env
from bench.config.common_site_config import get_config from bench.config.common_site_config import get_config
@ -22,7 +21,12 @@ def make_nginx_conf(bench_path):
"bench_name": get_bench_name(bench_path) "bench_name": get_bench_name(bench_path)
}) })
with open(os.path.join(bench_path, "config", "nginx.conf"), "w") as f: conf_path = os.path.join(bench_path, "config", "nginx.conf")
if not force and os.path.exists(conf_path):
click.confirm('nginx.conf already exists and this will overwrite it. Do you want to continue?',
abort=True)
with open(conf_path, "w") as f:
f.write(nginx_conf) f.write(nginx_conf)
def prepare_sites(config, bench_path): def prepare_sites(config, bench_path):

View File

@ -1,9 +1,14 @@
import bench, os import bench, os, click
from bench.utils import find_executable from bench.utils import find_executable
def setup_procfile(bench_path): def setup_procfile(bench_path, force=False):
procfile_path = os.path.join(bench_path, 'Procfile')
if not force and os.path.exists(procfile_path):
click.confirm('A Procfile already exists and this will overwrite it. Do you want to continue?',
abort=True)
procfile = bench.env.get_template('Procfile').render(node=find_executable("node") \ procfile = bench.env.get_template('Procfile').render(node=find_executable("node") \
or find_executable("nodejs")) or find_executable("nodejs"))
with open(os.path.join(bench_path, 'Procfile'), 'w') as f: with open(procfile_path, 'w') as f:
f.write(procfile) f.write(procfile)

View File

@ -1,6 +1,7 @@
import os, getpass, bench import os, getpass, click
import bench
def generate_supervisor_config(bench_path, user=None): def generate_supervisor_config(bench_path, user=None, force=False):
from bench.app import get_current_frappe_version from bench.app import get_current_frappe_version
from bench.utils import get_bench_name, find_executable from bench.utils import get_bench_name, find_executable
from bench.config.common_site_config import get_config, update_config, get_gunicorn_workers from bench.config.common_site_config import get_config, update_config, get_gunicorn_workers
@ -26,10 +27,16 @@ def generate_supervisor_config(bench_path, user=None):
"frappe_version": get_current_frappe_version(), "frappe_version": get_current_frappe_version(),
"webserver_port": config.get('webserver_port', 8000), "webserver_port": config.get('webserver_port', 8000),
"gunicorn_workers": config.get('gunicorn_workers', get_gunicorn_workers()["gunicorn_workers"]), "gunicorn_workers": config.get('gunicorn_workers', get_gunicorn_workers()["gunicorn_workers"]),
"bench_name": get_bench_name(bench_path) "bench_name": get_bench_name(bench_path),
"background_workers": config.get('background_workers') or 1
}) })
with open(os.path.join(bench_path, 'config', 'supervisor.conf'), 'w') as f: conf_path = os.path.join(bench_path, 'config', 'supervisor.conf')
if not force and os.path.exists(conf_path):
click.confirm('supervisor.conf already exists and this will overwrite it. Do you want to continue?',
abort=True)
with open(conf_path, 'w') as f:
f.write(config) f.write(config)
update_config({'restart_supervisor_on_update': True}, bench=bench_path) update_config({'restart_supervisor_on_update': True}, bench=bench_path)

View File

@ -3,8 +3,8 @@ redis_socketio: redis-server config/redis_socketio.conf
redis_queue: redis-server config/redis_queue.conf redis_queue: redis-server config/redis_queue.conf
web: bench serve web: bench serve
socketio: {{ node }} apps/frappe/socketio.js socketio: {{ node }} apps/frappe/socketio.js
workerbeat: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app beat -s scheduler.schedule'
worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n jobs@%h -Ofair --soft-time-limit 360 --time-limit 390'
longjob_worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n longjobs@%h -Ofair --soft-time-limit 1500 --time-limit 1530'
async_worker: sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app worker -n async@%h -Ofair --soft-time-limit 1500 --time-limit 1530'
watch: bench watch watch: bench watch
schedule: bench schedule
worker_short: bench worker --queue short
worker_long: bench worker --queue long
worker_default: bench worker --queue default

View File

@ -12,51 +12,57 @@ stderr_logfile={{ bench_dir }}/logs/web.error.log
user={{ user }} user={{ user }}
directory={{ sites_dir }} directory={{ sites_dir }}
[program:{{ bench_name }}-frappe-worker] [program:{{ bench_name }}-frappe-schedule]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n jobs@%%h -Ofair --soft-time-limit 360 --time-limit 390 --loglevel INFO command=bench schedule
priority=3
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/schedule.log
stderr_logfile={{ bench_dir }}/logs/schedule.error.log
user={{ user }}
directory={{ bench_dir }}
[program:{{ bench_name }}-frappe-default-worker]
command=bench worker --queue default
priority=4 priority=4
autostart=true autostart=true
autorestart=true autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }} user={{ user }}
stopwaitsecs=400 stopwaitsecs=1560
directory={{ sites_dir }} directory={{ bench_dir }}
killasgroup=true killasgroup=true
numprocs={{ background_workers }}
process_name=%(program_name)s-%(process_num)d
[program:{{ bench_name }}-frappe-longjob-worker] [program:{{ bench_name }}-frappe-short-worker]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n longjobs@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO command=bench worker --queue short
priority=2 priority=4
autostart=true autostart=true
autorestart=true autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }} user={{ user }}
stopwaitsecs=1540 stopwaitsecs=360
directory={{ sites_dir }} directory={{ bench_dir }}
killasgroup=true killasgroup=true
numprocs={{ background_workers }}
process_name=%(program_name)s-%(process_num)d
[program:{{ bench_name }}-frappe-async-worker] [program:{{ bench_name }}-frappe-long-worker]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n async@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO command=bench worker --queue long
priority=2 priority=4
autostart=true autostart=true
autorestart=true autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }} user={{ user }}
stopwaitsecs=1540 stopwaitsecs=1560
directory={{ sites_dir }} directory={{ bench_dir }}
killasgroup=true killasgroup=true
numprocs={{ background_workers }}
[program:{{ bench_name }}-frappe-workerbeat] process_name=%(program_name)s-%(process_num)d
command={{ bench_dir }}/env/bin/python -m frappe.celery_app beat -s beat.schedule
priority=3
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/workerbeat.log
stderr_logfile={{ bench_dir }}/logs/workerbeat.error.log
user={{ user }}
directory={{ sites_dir }}
[program:{{ bench_name }}-redis-cache] [program:{{ bench_name }}-redis-cache]
command={{ redis_server }} {{ redis_cache_config }} command={{ redis_server }} {{ redis_cache_config }}
@ -103,8 +109,11 @@ directory={{ bench_dir }}
{% endif %} {% endif %}
[group:{{ bench_name }}-processes] [group:{{ bench_name }}-web]
programs={{ bench_name }}-frappe-web,{{ bench_name }}-frappe-worker,{{ bench_name }}-frappe-longjob-worker,{{ bench_name }}-frappe-async-worker,{{ bench_name }}-frappe-workerbeat {%- if node -%} ,{{ bench_name }}-node-socketio {%- endif%} programs={{ bench_name }}-frappe-web {%- if node -%} ,{{ bench_name }}-node-socketio {%- endif%}
[group:{{ bench_name }}-workers]
programs={{ bench_name }}-frappe-schedule,{{ bench_name }}-frappe-default-worker,{{ bench_name }}-frappe-short-worker,{{ bench_name }}-frappe-long-worker
[group:{{ bench_name }}-redis] [group:{{ bench_name }}-redis]
programs={{ bench_name }}-redis-cache,{{ bench_name }}-redis-queue {%- if frappe_version > 5 -%} ,{{ bench_name }}-redis-socketio {%- endif %} programs={{ bench_name }}-redis-cache,{{ bench_name }}-redis-queue {%- if frappe_version > 5 -%} ,{{ bench_name }}-redis-socketio {%- endif %}

29
bench/patches/__init__.py Normal file
View File

@ -0,0 +1,29 @@
import os, importlib
def run(bench_path):
source_patch_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'patches.txt')
target_patch_file = os.path.join(os.path.abspath(bench_path), 'patches.txt')
with open(source_patch_file, 'r') as f:
patches = [p.strip() for p in f.read().splitlines()
if p.strip() and not p.strip().startswith("#")]
executed_patches = []
if os.path.exists(target_patch_file):
with open(target_patch_file, 'r') as f:
executed_patches = f.read().splitlines()
try:
for patch in patches:
if patch not in executed_patches:
module = importlib.import_module(patch.split()[0])
execute = getattr(module, 'execute')
execute(bench_path)
executed_patches.append(patch)
finally:
with open(target_patch_file, 'w') as f:
f.write('\n'.join(executed_patches))
# end with an empty line
f.write('\n')

View File

@ -0,0 +1,2 @@
bench.patches.v3.deprecate_old_config
bench.patches.v3.celery_to_rq

View File

View File

@ -0,0 +1,16 @@
import click, os
from bench.config.procfile import setup_procfile
from bench.config.supervisor import generate_supervisor_config
def execute(bench_path):
click.confirm('\nThis update will remove Celery config and prepare the bench to use Python RQ.\n'
'And it will overwrite Procfile and supervisor.conf.\n'
'If you don\'t know what this means, type Y ;)\n\n'
'Do you want to continue?',
abort=True)
setup_procfile(bench_path, force=True)
# if production setup
if os.path.exists(os.path.join(bench_path, 'config', 'supervisor.conf')):
generate_supervisor_config(bench_path, force=True)

View File

@ -0,0 +1,38 @@
import os, json
from bench.config.common_site_config import get_config, put_config, get_common_site_config
def execute(bench_path):
# deprecate bench config
bench_config_path = os.path.join(bench_path, 'config.json')
if not os.path.exists(bench_config_path):
return
with open(bench_config_path, "r") as f:
bench_config = json.loads(f.read())
common_site_config = get_common_site_config(bench_path)
common_site_config.update(bench_config)
put_config(common_site_config, bench_path)
# remove bench/config.json
os.remove(bench_config_path)
# change keys
config = get_config(bench_path)
changed = False
for from_key, to_key, default in (
("celery_broker", "redis_queue", "redis://localhost:6379"),
("async_redis_server", "redis_socketio", "redis://localhost:12311"),
("cache_redis_server", "redis_cache", "redis://localhost:11311")
):
if from_key in config:
config[to_key] = config[from_key]
del config[from_key]
changed = True
elif to_key not in config:
config[to_key] = default
changed = True
if changed:
put_config(config, bench_path)

View File

@ -221,14 +221,19 @@ def get_program(programs):
def get_process_manager(): def get_process_manager():
return get_program(['foreman', 'forego', 'honcho']) return get_program(['foreman', 'forego', 'honcho'])
def start(no_dev=False): def start(no_dev=False, concurrency=None):
program = get_process_manager() program = get_process_manager()
if not program: if not program:
raise Exception("No process manager found") raise Exception("No process manager found")
os.environ['PYTHONUNBUFFERED'] = "true" os.environ['PYTHONUNBUFFERED'] = "true"
if not no_dev: if not no_dev:
os.environ['DEV_SERVER'] = "true" os.environ['DEV_SERVER'] = "true"
os.execv(program, [program, 'start'])
command = [program, 'start']
if concurrency:
command.extend(['-c', concurrency])
os.execv(program, command)
def check_cmd(cmd, cwd='.'): def check_cmd(cmd, cwd='.'):
try: try: