From bc7376b188b49c8b9d01ae0feb811593fc09fe45 Mon Sep 17 00:00:00 2001 From: Pratik Vyas Date: Sat, 4 Jul 2015 14:06:53 +0530 Subject: [PATCH] Async --- bench/app.py | 44 ++++++++------- bench/cli.py | 97 ++++++++++++++++++++------------ bench/config.py | 21 +++++-- bench/templates/redis.conf | 72 ------------------------ bench/templates/redis_cache.conf | 7 +++ bench/templates/supervisor.conf | 18 +++++- bench/utils.py | 81 ++++++++++++++++++++------ 7 files changed, 185 insertions(+), 155 deletions(-) delete mode 100644 bench/templates/redis.conf create mode 100644 bench/templates/redis_cache.conf diff --git a/bench/app.py b/bench/app.py index fb0dc42b..3a3cb489 100644 --- a/bench/app.py +++ b/bench/app.py @@ -79,54 +79,49 @@ def install_app(app, bench='.'): add_to_appstxt(app, bench=bench) def pull_all_apps(bench='.'): - apps_dir = os.path.join(bench, 'apps') - apps = [app for app in os.listdir(apps_dir) if os.path.isdir(os.path.join(apps_dir, app))] rebase = '--rebase' if get_config().get('rebase_on_pull') else '' - frappe_dir = os.path.join(apps_dir, 'frappe') - for app in apps: - app_dir = os.path.join(apps_dir, app) + for app in get_apps(bench=bench): + app_dir = get_repo_dir(app, bench=bench) if os.path.exists(os.path.join(app_dir, '.git')): logger.info('pulling {0}'.format(app)) - exec_cmd("git pull {rebase} upstream {branch}".format(rebase=rebase, branch=get_current_branch(app_dir)), cwd=app_dir) + exec_cmd("git pull {rebase} upstream {branch}".format(rebase=rebase, branch=get_current_branch(app, bench=bench)), cwd=app_dir) def is_version_upgrade(bench='.', branch=None): - apps_dir = os.path.join(bench, 'apps') - frappe_dir = os.path.join(apps_dir, 'frappe') - - fetch_upstream(frappe_dir) - upstream_version = get_upstream_version(frappe_dir, branch=branch) + fetch_upstream('frappe', bench=bench) + upstream_version = get_upstream_version('frappe', bench=bench, branch=branch) if not upstream_version: raise Exception("Current branch of 'frappe' not in upstream") - local_version = get_major_version(get_current_version(frappe_dir)) + local_version = get_major_version(get_current_version('frappe', bench=bench)) upstream_version = get_major_version(upstream_version) - if upstream_version - local_version > 0: + if upstream_version - local_version > 0: return (local_version, upstream_version) return False def get_current_frappe_version(bench='.'): - apps_dir = os.path.join(bench, 'apps') - frappe_dir = os.path.join(apps_dir, 'frappe') - try: - return get_major_version(get_current_version(frappe_dir)) + return get_major_version(get_current_version('frappe', bench=bench)) except IOError: return '' -def get_current_branch(repo_dir): +def get_current_branch(app, bench='.'): + repo_dir = get_repo_dir(app, bench=bench) return get_cmd_output("basename $(git symbolic-ref -q HEAD)", cwd=repo_dir) -def fetch_upstream(repo_dir): +def fetch_upstream(app, bench='.'): + repo_dir = get_repo_dir(app, bench=bench) return exec_cmd("git fetch upstream", cwd=repo_dir) -def get_current_version(repo_dir): +def get_current_version(app, bench='.'): + repo_dir = get_repo_dir(app, bench=bench) with open(os.path.join(repo_dir, 'setup.py')) as f: return get_version_from_string(f.read()) -def get_upstream_version(repo_dir, branch=None): +def get_upstream_version(app, branch=None, bench='.'): + repo_dir = get_repo_dir(app, bench=bench) if not branch: branch = get_current_branch(repo_dir) try: @@ -138,6 +133,13 @@ def get_upstream_version(repo_dir, branch=None): raise return get_version_from_string(contents) +def get_upstream_url(app, bench='.'): + repo_dir = get_repo_dir(app, bench=bench) + return subprocess.check_output(['git', 'config', '--get', 'remote.upstream.url'], cwd=repo_dir).strip() + +def get_repo_dir(app, bench='.'): + return os.path.join(bench, 'apps', app) + def switch_branch(branch, apps=None, bench='.', upgrade=False): from .utils import update_requirements, backup_all_sites, patch_sites, build_assets, pre_upgrade, post_upgrade import utils diff --git a/bench/cli.py b/bench/cli.py index 35a7d545..499d47b0 100644 --- a/bench/cli.py +++ b/bench/cli.py @@ -14,11 +14,11 @@ from .utils import (build_assets, patch_sites, exec_cmd, update_bench, get_env_c get_config, update_config, restart_supervisor_processes, put_config, default_config, update_requirements, backup_all_sites, backup_site, get_sites, prime_wheel_cache, is_root, set_mariadb_host, drop_privileges, fix_file_perms, fix_prod_setup_perms, set_ssl_certificate, set_ssl_certificate_key, get_cmd_output, post_upgrade, - pre_upgrade, PatchError, download_translations_p) + pre_upgrade, PatchError, download_translations_p, setup_socketio) from .app import get_app as _get_app from .app import new_app as _new_app from .app import pull_all_apps, get_apps, get_current_frappe_version, is_version_upgrade, switch_to_v4, switch_to_master, switch_to_develop -from .config import generate_nginx_config, generate_supervisor_config, generate_redis_config +from .config import generate_nginx_config, generate_supervisor_config, generate_redis_cache_config, generate_redis_async_broker_config from .production_setup import setup_production as _setup_production from .migrate_to_v5 import migrate_to_v5 import os @@ -194,29 +194,13 @@ def new_site(site, mariadb_root_password=None, admin_password=None): @click.option('--auto',flag_value=True, type=bool) @click.option('--upgrade',flag_value=True, type=bool) @click.option('--no-backup',flag_value=True, type=bool) -def update(pull=False, patch=False, build=False, bench=False, auto=False, restart_supervisor=False, requirements=False, no_backup=False, upgrade=False): +def _update(pull=False, patch=False, build=False, bench=False, auto=False, restart_supervisor=False, requirements=False, no_backup=False, upgrade=False): "Update bench" if not (pull or patch or build or bench or requirements): pull, patch, build, bench, requirements = True, True, True, True, True conf = get_config() - if conf.get('release_bench'): - print 'Release bench, cannot update' - sys.exit(1) - if auto: - sys.exit(1) - if bench and conf.get('update_bench_on_update'): - update_bench() - restart_update({ - 'pull': pull, - 'patch': patch, - 'build': build, - 'requirements': requirements, - 'no-backup': no_backup, - 'restart-supervisor': restart_supervisor, - 'upgrade': upgrade - }) version_upgrade = is_version_upgrade() @@ -229,35 +213,60 @@ def update(pull=False, patch=False, build=False, bench=False, auto=False, restar # print "You can also pin your bench to {0} by running `bench swtich-to-v{0}`".format(version_upgrade[0]) print "You can stay on the latest stable release by running `bench switch-to-master` or pin your bench to {0} by running `bench swtich-to-v{0}`".format(version_upgrade[0]) sys.exit(1) - elif not version_upgrade and upgrade: - upgrade = False + + if conf.get('release_bench'): + print 'Release bench, cannot update' + sys.exit(1) + + if auto: + sys.exit(1) + + if bench and conf.get('update_bench_on_update'): + update_bench() + restart_update({ + 'pull': pull, + 'patch': patch, + 'build': build, + 'requirements': requirements, + 'no-backup': no_backup, + 'restart-supervisor': restart_supervisor, + 'upgrade': upgrade + }) + + update(pull, patch, build, bench, auto, restart_supervisor, requirements, no_backup, upgrade) + + print "_"*80 + print "https://frappe.io/buy - Donate to help make better free and open source tools" + print + +def update(pull=False, patch=False, build=False, bench=False, auto=False, restart_supervisor=False, requirements=False, no_backup=False, upgrade=False, bench_path='.'): + conf = get_config(bench=bench_path) + version_upgrade = is_version_upgrade(bench=bench_path) + if version_upgrade and not upgrade: + raise Exception("Major Version Upgrade") if pull: - pull_all_apps() + pull_all_apps(bench=bench_path) if requirements: - update_requirements() + update_requirements(bench=bench_path) if upgrade: - pre_upgrade(version_upgrade[0], version_upgrade[1]) + pre_upgrade(version_upgrade[0], version_upgrade[1], bench=bench_path) import utils, app reload(utils) reload(app) if patch: if not no_backup: - backup_all_sites() - patch_sites() + backup_all_sites(bench=bench_path) + patch_sites(bench=bench_path) if build: - build_assets() + build_assets(bench=bench_path) if restart_supervisor or conf.get('restart_supervisor_on_update'): - restart_supervisor_processes() + restart_supervisor_processes(bench=bench_path) if upgrade: - post_upgrade(version_upgrade[0], version_upgrade[1]) - - print "_"*80 - print "https://frappe.io/buy - Donate to help make better free and open source tools" - print + post_upgrade(version_upgrade[0], version_upgrade[1], bench=bench_path) @click.command('retry-upgrade') @click.option('--version', default=5) @@ -412,7 +421,12 @@ def setup_supervisor(): @click.command('redis-cache') def setup_redis_cache(): "generate config for redis cache" - generate_redis_config() + generate_redis_cache_config() + +@click.command('redis-async-broker') +def setup_redis_async_broker(): + "generate config for redis async broker" + generate_redis_async_broker_config() @click.command('production') @click.argument('user') @@ -440,9 +454,16 @@ def setup_env(): _setup_env() @click.command('procfile') -def setup_procfile(): +@click.option('--with-watch', flag_value=True, type=bool) +@click.option('--with-celery-broker', flag_value=True, type=bool) +def setup_procfile(with_celery_broker, with_watch): "Setup Procfile for bench start" - _setup_procfile() + _setup_procfile(with_celery_broker, with_watch) + +@click.command('socketio') +def _setup_socketio(): + "Setup node deps for socketio server" + setup_socketio() @click.command('config') def setup_config(): @@ -453,11 +474,13 @@ setup.add_command(setup_nginx) setup.add_command(setup_sudoers) setup.add_command(setup_supervisor) setup.add_command(setup_redis_cache) +setup.add_command(setup_redis_async_broker) setup.add_command(setup_auto_update) setup.add_command(setup_dnsmasq) setup.add_command(setup_backups) setup.add_command(setup_env) setup.add_command(setup_procfile) +setup.add_command(_setup_socketio) setup.add_command(setup_config) setup.add_command(setup_production) @@ -562,7 +585,7 @@ bench.add_command(get_app) bench.add_command(new_app) bench.add_command(new_site) bench.add_command(setup) -bench.add_command(update) +bench.add_command(_update) bench.add_command(restart) bench.add_command(config) bench.add_command(start) diff --git a/bench/config.py b/bench/config.py index 4d698a6e..c39b4322 100644 --- a/bench/config.py +++ b/bench/config.py @@ -36,7 +36,8 @@ def generate_supervisor_config(bench='.', user=None): "user": user, "http_timeout": config.get("http_timeout", 120), "redis_server": subprocess.check_output('which redis-server', shell=True).strip(), - "redis_config": os.path.join(bench_dir, 'config', 'redis.conf'), + "redis_cache_config": os.path.join(bench_dir, 'config', 'redis_cache.conf'), + "redis_async_broker_config": os.path.join(bench_dir, 'config', 'redis_async_broker.conf'), "frappe_version": get_current_frappe_version() }) write_config_file(bench, 'supervisor.conf', config) @@ -47,7 +48,7 @@ def get_site_config(site, bench='.'): return json.load(f) def get_sites_with_config(bench='.'): - sites = get_sites() + sites = get_sites(bench=bench) ret = [] for site in sites: site_config = get_site_config(site, bench=bench) @@ -84,12 +85,22 @@ def generate_nginx_config(bench='.'): }) write_config_file(bench, 'nginx.conf', config) -def generate_redis_config(bench='.'): - template = env.get_template('redis.conf') +def generate_redis_cache_config(bench='.'): + template = env.get_template('redis_cache.conf') conf = { "maxmemory": get_config().get('cache_maxmemory', '50'), "port": get_config().get('redis_cache_port', '11311'), "redis_version": get_redis_version() } config = template.render(**conf) - write_config_file(bench, 'redis.conf', config) + write_config_file(bench, 'redis_cache.conf', config) + + +def generate_redis_async_broker_config(bench='.'): + template = env.get_template('redis_async_broker.conf') + conf = { + "port": get_config().get('redis_async_broker_port', '12311'), + "redis_version": get_redis_version() + } + config = template.render(**conf) + write_config_file(bench, 'redis_async_broker.conf', config) diff --git a/bench/templates/redis.conf b/bench/templates/redis.conf deleted file mode 100644 index 60d1eecb..00000000 --- a/bench/templates/redis.conf +++ /dev/null @@ -1,72 +0,0 @@ -activerehashing yes -appendfsync everysec -appendonly no -auto-aof-rewrite-min-size 64mb -auto-aof-rewrite-percentage 100 -daemonize no -databases 16 -dbfilename dump.rdb -list-max-ziplist-entries 512 -list-max-ziplist-value 64 -no-appendfsync-on-rewrite no -pidfile /var/run/redis.pid -port {{port}} -rdbcompression yes -set-max-intset-entries 512 -slave-serve-stale-data yes -slowlog-log-slower-than 10000 -slowlog-max-len 128 -timeout 0 -zset-max-ziplist-entries 128 -zset-max-ziplist-value 64 - -maxmemory {{maxmemory}}mb -maxmemory-policy allkeys-lru - -{% if redis_version == "2.4"%} -hash-max-zipmap-entries 512 -hash-max-zipmap-value 64 -loglevel verbose -vm-enabled no -vm-max-memory 0 -vm-max-threads 4 -vm-page-size 32 -vm-pages 134217728 -vm-swap-file /tmp/redis.swap -{% endif %} - -{% if redis_version == "2.6"%} -aof-rewrite-incremental-fsync yes -client-output-buffer-limit normal 0 0 0 -client-output-buffer-limit pubsub 32mb 8mb 60 -client-output-buffer-limit slave 256mb 64mb 60 -hash-max-ziplist-entries 512 -hash-max-ziplist-value 64 -hz 10 -loglevel notice -lua-time-limit 5000 -rdbchecksum yes -repl-disable-tcp-nodelay no -slave-read-only yes -stop-writes-on-bgsave-error yes -tcp-keepalive 0 -{% endif %} - -{% if redis_version == "2.8"%} -aof-rewrite-incremental-fsync yes -appendfilename "appendonly.aof" -client-output-buffer-limit normal 0 0 0 -client-output-buffer-limit pubsub 32mb 8mb 60 -client-output-buffer-limit slave 256mb 64mb 60 -hash-max-ziplist-entries 512 -hash-max-ziplist-value 64 -hz 10 -logfile "" -loglevel notice -lua-time-limit 5000 -notify-keyspace-events "" -rdbchecksum yes -slave-read-only yes -stop-writes-on-bgsave-error yes -tcp-keepalive 0 -{% endif %} diff --git a/bench/templates/redis_cache.conf b/bench/templates/redis_cache.conf new file mode 100644 index 00000000..dfb35872 --- /dev/null +++ b/bench/templates/redis_cache.conf @@ -0,0 +1,7 @@ +dbfilename redis_cache_dump.rdb +pidfile redis_cache.pid +port {{port}} +maxmemory {{maxmemory}}mb +maxmemory-policy allkeys-lru +save "" +appendonly no \ No newline at end of file diff --git a/bench/templates/supervisor.conf b/bench/templates/supervisor.conf index 26356733..e24e42c3 100644 --- a/bench/templates/supervisor.conf +++ b/bench/templates/supervisor.conf @@ -31,12 +31,24 @@ directory={{ sites_dir }} {% if frappe_version > 4%} [program:redis-cache] -command={{ redis_server }} {{ redis_config }} +command={{ redis_server }} {{ redis_cache_config }} autostart=true autorestart=true stopsignal=QUIT -stdout_logfile={{ bench_dir }}/logs/redis.log -stderr_logfile={{ bench_dir }}/logs/redis.error.log +stdout_logfile={{ bench_dir }}/logs/redis-cache.log +stderr_logfile={{ bench_dir }}/logs/redis-cache.error.log +user={{ user }} +directory={{ sites_dir }} +{% endif %} + +{% if frappe_version > 5%} +[program:redis-async-broker] +command={{ redis_server }} {{ redis_async_broker_config }} +autostart=true +autorestart=true +stopsignal=QUIT +stdout_logfile={{ bench_dir }}/logs/redis-async-broker.log +stderr_logfile={{ bench_dir }}/logs/redis-async-broker.error.log user={{ user }} directory={{ sites_dir }} {% endif %} diff --git a/bench/utils.py b/bench/utils.py index ed7a9730..81c1bf36 100644 --- a/bench/utils.py +++ b/bench/utils.py @@ -7,6 +7,7 @@ import logging import itertools import requests import json +import select import multiprocessing from distutils.spawn import find_executable import pwd, grp @@ -15,6 +16,10 @@ import pwd, grp class PatchError(Exception): pass + +class CommandFailedError(Exception): + pass + logger = logging.getLogger(__name__) @@ -75,18 +80,20 @@ def init(path, apps_path=None, no_procfile=False, no_backups=False, generate_redis_config(bench=path) def exec_cmd(cmd, cwd='.'): - try: - subprocess.check_call(cmd, cwd=cwd, shell=True) - except subprocess.CalledProcessError, e: - print "Error:", getattr(e, "output", None) or getattr(e, "error", None) - raise + p = subprocess.Popen(cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return_code = print_output(p) + if return_code > 0: + raise CommandFailedError(cmd) def setup_env(bench='.'): exec_cmd('virtualenv -q {} -p {}'.format('env', sys.executable), cwd=bench) exec_cmd('./env/bin/pip -q install wheel', cwd=bench) exec_cmd('./env/bin/pip -q install https://github.com/frappe/MySQLdb1/archive/MySQLdb-1.2.5-patched.tar.gz', cwd=bench) -def setup_procfile(bench='.'): +def setup_socketio(bench='.'): + exec_cmd("npm install nodemon socket.io redis express superagent cookie", cwd=bench) + +def setup_procfile(with_celery_broker=False, with_watch=False, bench='.'): from .app import get_current_frappe_version frappe_version = get_current_frappe_version() procfile_contents = { @@ -95,9 +102,15 @@ def setup_procfile(bench='.'): 'workerbeat': "sh -c 'cd sites && exec ../env/bin/python -m frappe.celery_app beat -s scheduler.schedule'" } if frappe_version > 4: - procfile_contents['redis_cache'] = "redis-server config/redis.conf" + procfile_contents['redis_cache'] = "redis-server config/redis_cache.conf" + procfile_contents['redis_async_broker'] = "redis-server config/redis_async_broker.conf" procfile_contents['web'] = "bench serve" - + procfile_contents['socketio'] = "./node_modules/.bin/nodemon apps/frappe/socketio.js" + if with_celery_broker: + procfile_contents['redis_celery'] = "redis-server" + if with_watch: + procfile_contents['watch'] = "bench watch" + procfile = '\n'.join(["{0}: {1}".format(k, v) for k, v in procfile_contents.items()]) with open(os.path.join(bench, 'Procfile'), 'w') as f: @@ -283,16 +296,13 @@ def update_site_config(site, new_config, bench='.'): put_site_config(site, config, bench=bench) def set_nginx_port(site, port, bench='.', gen_config=True): - set_site_config_nginx_property(site, {"nginx_port": port}, bench=bench) + set_site_config_nginx_property(site, {"nginx_port": port}, bench=bench, gen_config=gen_config) def set_ssl_certificate(site, ssl_certificate, bench='.', gen_config=True): - set_site_config_nginx_property(site, {"ssl_certificate": ssl_certificate}, bench=bench) + set_site_config_nginx_property(site, {"ssl_certificate": ssl_certificate}, bench=bench, gen_config=gen_config) def set_ssl_certificate_key(site, ssl_certificate_key, bench='.', gen_config=True): - set_site_config_nginx_property(site, {"ssl_certificate_key": ssl_certificate_key}, bench=bench) - -def set_nginx_port(site, port, bench='.', gen_config=True): - set_site_config_nginx_property(site, {"nginx_port": port}, bench=bench) + set_site_config_nginx_property(site, {"ssl_certificate_key": ssl_certificate_key}, bench=bench, gen_config=gen_config) def set_site_config_nginx_property(site, config, bench='.', gen_config=True): from .config import generate_nginx_config @@ -300,7 +310,7 @@ def set_site_config_nginx_property(site, config, bench='.', gen_config=True): raise Exception("No such site") update_site_config(site, config, bench=bench) if gen_config: - generate_nginx_config() + generate_nginx_config(bench=bench) def set_url_root(site, url_root, bench='.'): update_site_config(site, {"host_name": url_root}, bench=bench) @@ -434,7 +444,16 @@ def run_frappe_cmd(*args, **kwargs): bench = kwargs.get('bench', '.') f = get_env_cmd('python', bench=bench) sites_dir = os.path.join(bench, 'sites') - subprocess.check_call((f, '-m', 'frappe.utils.bench_helper', 'frappe') + args, cwd=sites_dir) + p = subprocess.Popen((f, '-m', 'frappe.utils.bench_helper', 'frappe') + args, cwd=sites_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return_code = print_output(p) + if return_code > 0: + raise CommandFailedError(args) + +def get_frappe_cmd_output(*args, **kwargs): + bench = kwargs.get('bench', '.') + f = get_env_cmd('python', bench=bench) + sites_dir = os.path.join(bench, 'sites') + return subprocess.check_output((f, '-m', 'frappe.utils.bench_helper', 'frappe') + args, cwd=sites_dir) def pre_upgrade(from_ver, to_ver, bench='.'): @@ -506,5 +525,33 @@ def update_translations(app, lang): f.write(r.text.encode('utf-8')) print 'downloaded for', app, lang - + +def print_output(p): + while p.poll() is None: + readx = select.select([p.stdout.fileno(), p.stderr.fileno()], [], [])[0] + send_buffer = [] + for fd in readx: + if fd == p.stdout.fileno(): + while 1: + buf = p.stdout.read(1) + if not len(buf): + break + if buf == '\r' or buf == '\n': + send_buffer.append(buf) + log_line(''.join(send_buffer), 'stdout') + send_buffer = [] + else: + send_buffer.append(buf) + + if fd == p.stderr.fileno(): + log_line(p.stderr.readline(), 'stderr') + return p.poll() + + +def log_line(data, stream): + if stream == 'stderr': + return sys.stderr.write(data) + return sys.stdout.write(data) + + FRAPPE_VERSION = get_current_frappe_version()