2
1
mirror of https://github.com/qpdf/qpdf.git synced 2024-12-22 19:08:59 +00:00
qpdf/generate_auto_job
Jay Berkenbilt 511d70166d Check generated QPDFJob files in CI
This is to catch pull requests that make changes that would affect
automatically generated job files without including those
modifications.
2022-02-08 11:51:15 -05:00

893 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import argparse
import hashlib
import re
import yaml
import json
import filecmp
from contextlib import contextmanager
# The purpose of this code is to automatically generate various parts
# of the QPDFJob class. It is fairly complicated and extremely
# bespoke, so understanding it is important if modifications are to be
# made.
# Documentation of QPDFJob is divided among three places:
#
# * "HOW TO ADD A COMMAND-LINE ARGUMENT" in README-maintainer provides
# a quick reminder for how to add a command-line argument
#
# * This file has a detailed explanation about how QPDFJob and
# generate_auto_job work together
#
# * The manual ("QPDFJob Design" in qpdf-job.rst) discusses the design
# approach, rationale, and evolution of QPDFJob.
#
# QPDFJob solved the problem of moving extensive functionality that
# lived in qpdf.cc into the library. The QPDFJob class consists of
# four major sections:
#
# * The run() method and its subsidiaries are responsible for
# performing the actual operations on PDF files. This is implemented
# in QPDFJob.cc
#
# * The nested Config class and the other classes it creates provide
# an API for setting up a QPDFJob instance and correspond to the
# command-line arguments of the qpdf executable. This is implemented
# in QPDFJob_config.cc
#
# * The argument parsing code reads an argv array and calls
# configuration methods. This is implemented in QPDFJob_argv.cc. The
# argument parsing logic itself is implemented in the QPDFArgParser
# class.
#
# * The job JSON handling code, which reads a QPDFJob JSON file and
# calls configuration methods. This is implemented in
# QPDFJob_json.cc. The JSON parsing code is in the JSON class. A
# sax-like JSON handler class that calls callbacks in response to
# items in the JSON is implemented in the JSONHandler class.
#
# This code has the job of ensuring that configuration, command-line
# arguments, and JSON are all consistent and complete so that a
# developer or user can freely move among those different ways of
# interacting with QPDFJob in a predictable fashion. In addition, help
# information for each option appears in manual/cli.rst, and that
# information is used in the creation of the job JSON schema and to supply
# help text to QPDFArgParser. This code also ensures that there is an
# exact match between options in job.yml and options in cli.rst.
#
# The job.yml file contains the data that drives this code. To
# understand job.yml, here are some important concepts.
#
# QPDFArgParser option table. There is support for positional
# arguments, options consisting of flags and optional parameters, and
# subparsers that start with a regular parameterless flag, have their
# own positional and option sections, and are terminated with -- by
# itself. Examples of this include --encrypt and --pages. An "option
# table" contains an optional positional argument handler and a list
# of valid options with specifications about their parameters. There
# are three kinds of option tables:
#
# * The built-in "help" option table contains help commands, like
# --help and --version, that are only valid when they appear as the
# single command-line argument.
#
# * The "main" option table contains the options that are valid
# starting at the beginning of argument parsing.
#
# * A named option table can be started manually by the argument
# parsing code to switch the argument parser's context. Switching
# the parser to a new option table is manual (via a call to
# selectOptionTable). Context reverts to the main option table
# automatically when -- is encountered.
#
# In QPDFJob.hh, there is a Config class for each option table except
# help.
#
# Option type: bare, required/optional parameter, required/optional
# choices. A bare argument is just a flag, like --qdf. A parameter
# option takes an arbitrary parameter, like --password. A choices
# option takes one of a fixed list of choices, like --object-streams.
# If a parameter or choices option's parameter is option, the empty
# string may be specified as an option, such as --collate (or
# --collate=). For a bare option, --option= is always the same as just
# --option. This makes it possible to switch an option from bare to
# optional choice to optional parameter all without breaking
# compatibility.
#
# JSON "schema". This is a qpdf-specific "schema" for JSON. It is not
# related to any kind of standard JSON schema. It is described in
# JSON.hh and in the manual. QPDFJob uses the JSON "schema" in a mode
# in which keys in the schema are all optional in the JSON object.
#
# Here is the mapping between configuration, argv, and JSON.
#
# The help options table is implemented solely for argv processing and
# has no counterpart in configuration or JSON.
#
# The config() method returns a shared pointer to a Config object.
# Every command-line option in the main option table has a
# corresponding method in Config whose name is the option converted to
# camel case. For bare options and options with optional parameters, a
# version exists that takes no arguments. For other than bare options,
# a version exist, possibly in addition, that takes a std::string
# const&. For example, the --qdf flag implies a qdf() method in
# Config, and the --object-streams flag implies an
# objectStreams(std::string const&) method in Config. For flags in
# option tables, the method is declared inside a config class specific
# to the option table. The mapping between option tables and config
# classes is explicit in job.yml. Positional arguments are handled
# individually and manually -- see QPDFJob.hh in the CONFIGURATION
# section for details. See examples/qpdf-job.cc for an example.
#
# To understand the rest, start at main and follow comments in the
# code.
whoami = os.path.basename(sys.argv[0])
BANNER = f'''//
// This file is automatically generated by {whoami}.
// Edits will be automatically overwritten if the build is
// run in maintainer mode.
//'''
def warn(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@contextmanager
def write_file(filename):
tmpfile = filename + '.tmp'
with open(tmpfile, 'w') as f:
yield f
if os.path.exists(filename) and filecmp.cmp(filename, tmpfile, False):
os.unlink(tmpfile)
else:
os.rename(tmpfile, filename)
class Main:
# SOURCES is a list of source files whose contents are used by
# this program. If they change, we are out of date.
SOURCES = [
whoami,
'manual/_ext/qpdf.py',
'job.yml',
'manual/cli.rst',
]
# DESTS is a map to the output files this code generates. These
# generated files, as well as those added to DESTS later in the
# code, are included in various places by QPDFJob.hh or any of the
# implementing QPDFJob*.cc files.
DESTS = {
'decl': 'libqpdf/qpdf/auto_job_decl.hh',
'init': 'libqpdf/qpdf/auto_job_init.hh',
'help': 'libqpdf/qpdf/auto_job_help.hh',
'schema': 'libqpdf/qpdf/auto_job_schema.hh',
'json_decl': 'libqpdf/qpdf/auto_job_json_decl.hh',
'json_init': 'libqpdf/qpdf/auto_job_json_init.hh',
# Others are added in top
}
# SUMS contains a checksum for each source and destination and is
# used to detect whether we're up to date without having to force
# recompilation all the time. This way the build can invoke this
# script unconditionally without causing stuff to rebuild every
# time.
SUMS = 'job.sums'
def main(self, args=sys.argv[1:], prog=whoami):
options = self.parse_args(args, prog)
self.top(options)
def parse_args(self, args, prog):
parser = argparse.ArgumentParser(
prog=prog,
description='Generate files for QPDFJob',
)
mxg = parser.add_mutually_exclusive_group(required=True)
mxg.add_argument('--check',
help='update checksums if files are not up to date',
action='store_true', default=False)
mxg.add_argument('--generate',
help='generate files from sources',
action='store_true', default=False)
return parser.parse_args(args)
def top(self, options):
with open('job.yml', 'r') as f:
data = yaml.safe_load(f.read())
# config_decls maps a config key from an option in "options"
# (from job.yml) to a list of declarations. A declaration is
# generated for each config method for that option table.
self.config_decls = {}
# Keep track of which configs we've declared since we can have
# option tables share a config class, as with the encryption
# tables.
self.declared_configs = set()
# Update DESTS -- see above. This ensures that each config
# class's contents are included in job.sums.
for o in data['options']:
config = o.get('config', None)
if config is not None:
self.DESTS[config] = f'include/qpdf/auto_job_{config}.hh'
self.config_decls[config] = []
if self.check_hashes():
exit(0)
elif options.check:
exit(f'{whoami}: auto job inputs have changed')
elif options.generate:
self.generate(data)
else:
exit(f'{whoami} unknown mode')
def get_hashes(self):
hashes = {}
for i in sorted([*self.SOURCES, *self.DESTS.values()]):
m = hashlib.sha256()
try:
with open(i, 'rb') as f:
m.update(f.read())
hashes[i] = m.hexdigest()
except FileNotFoundError:
pass
return hashes
def check_hashes(self):
hashes = self.get_hashes()
match = False
try:
old_hashes = {}
with open(self.SUMS, 'r') as f:
for line in f.readlines():
m = re.match(r'^(\S+) (\S+)\s*$', line)
if m:
old_hashes[m.group(1)] = m.group(2)
match = old_hashes == hashes
if not match:
# Write to stdout, not stderr. What we write to stderr
# is visible in a normal build. Writing to stdout will
# hide it in that case but expose it if you directly
# run ./generate_auto_job --check as in CI.
print(f'*** {whoami} hash mismatches ***')
match = False
for k, v in hashes.items():
if k not in old_hashes:
print(f' {k} is not in job.sums')
elif v != old_hashes[k]:
print(f' {k} was modified')
for k in old_hashes:
if k not in hashes:
print(f' {k} disappeared')
except Exception:
pass
return match
def update_hashes(self):
hashes = self.get_hashes()
with open(self.SUMS, 'w') as f:
print(f'# Generated by {whoami}', file=f)
for k, v in hashes.items():
print(f'{k} {v}', file=f)
def generate_doc(self, df, f):
st_top = 0
st_topic = 1
st_option = 2
st_option_help = 3
state = st_top
indent = None
topic = None
option = None
short_text = None
long_text = None
# Generate a bunch of short static functions rather than a big
# member function for help. Some compilers have problems with
# very large member functions in classes in anonymous
# namespaces.
help_files = 0
help_lines = 0
self.all_topics = set(self.options_without_help)
self.referenced_topics = set()
def set_indent(x):
nonlocal indent
indent = ' ' * len(x)
def append_long_text(line, topic):
nonlocal indent, long_text
if line == '\n':
long_text += '\n'
elif line.startswith(indent):
long_text += line[len(indent):]
else:
long_text = long_text.strip()
if long_text == '':
raise Exception(f'missing long text for {topic}')
long_text += '\n'
if 'help' not in topic:
# Help for --help itself has --help=... not
# referring to specific options.
for i in re.finditer(r'--help=([^\.\s]+)', long_text):
self.referenced_topics.add(i.group(1))
return True
return False
lineno = 0
for line in df.readlines():
if help_lines == 0:
if help_files > 0:
print('}', file=f)
help_files += 1
help_lines += 1
print(f'static void add_help_{help_files}(QPDFArgParser& ap)\n'
'{', file=f)
lineno += 1
if state == st_top:
m = re.match(r'^(\s*\.\. )help-topic (\S+): (.*)$', line)
if m:
set_indent(m.group(1))
topic = m.group(2)
short_text = m.group(3)
long_text = ''
state = st_topic
continue
m = re.match(
r'^(\s*\.\. )qpdf:option:: (([^=\[\s]+)([\[= ](.+))?)$',
line)
if m:
if topic is None:
raise Exception('option seen before topic')
set_indent(m.group(1))
option = m.group(3)
synopsis = m.group(2)
if synopsis.endswith('`'):
raise Exception(
f'stray ` at end of option line (line {lineno})')
if synopsis != option:
long_text = synopsis + '\n'
else:
long_text = ''
state = st_option
continue
elif state == st_topic:
if append_long_text(line, topic):
self.all_topics.add(topic)
print(f'ap.addHelpTopic("{topic}", "{short_text}",'
f' R"({long_text})");', file=f)
help_lines += 1
state = st_top
elif state == st_option:
if line == '\n' or line.startswith(indent):
m = re.match(r'^(\s*\.\. )help: (.*)$', line)
if m:
set_indent(m.group(1))
short_text = m.group(2)
state = st_option_help
else:
raise Exception('option without help text')
state = st_top
elif state == st_option_help:
if append_long_text(line, option):
if option in self.options_without_help:
self.options_without_help.remove(option)
else:
raise Exception(
f'help for unknown option {option},'
f' lineno={lineno}')
if option not in self.help_options:
self.jdata[option[2:]]['help'] = short_text
print(f'ap.addOptionHelp("{option}", "{topic}",'
f' "{short_text}", R"({long_text})");', file=f)
help_lines += 1
state = st_top
if help_lines == 20:
help_lines = 0
print('}', file=f)
print('static void add_help(QPDFArgParser& ap)\n{', file=f)
for i in range(help_files):
print(f' add_help_{i+1}(ap);', file=f)
print('ap.addHelpFooter("For detailed help, visit'
' the qpdf manual: https://qpdf.readthedocs.io\\n");', file=f)
print('}\n', file=f)
for i in self.referenced_topics:
if i not in self.all_topics:
raise Exception(f'help text referenced --help={i}')
for i in self.options_without_help:
raise Exception(
'Options without help: ' +
', '.join(self.options_without_help))
def generate(self, data):
warn(f'{whoami}: regenerating auto job files')
self.validate(data)
# Keep track of which options are help options since they are
# handled specially. Add the built-in help options to tables
# that we populate as we read job.yml since we won't encounter
# these in job.yml
self.help_options = set(
['--completion-bash', '--completion-zsh', '--help']
)
# Keep track of which options we have encountered but haven't
# seen help text for. This enables us to report if any option
# is missing help.
self.options_without_help = set(self.help_options)
# Compute the information needed for generated files and write
# the files.
self.prepare(data)
with write_file(self.DESTS['decl']) as f:
print(BANNER, file=f)
for i in self.decls:
print(i, file=f)
with write_file(self.DESTS['init']) as f:
print(BANNER, file=f)
for i in self.init:
print(i, file=f)
with write_file(self.DESTS['help']) as f:
with open('manual/cli.rst', 'r') as df:
print(BANNER, file=f)
self.generate_doc(df, f)
# Compute the json files after the config and arg parsing
# files. We need to have full information about all the
# options before we can generate the schema. Generating the
# schema also generates the json header files.
self.generate_schema(data)
with write_file(self.DESTS['schema']) as f:
print('static constexpr char const* JOB_SCHEMA_DATA = R"(' +
json.dumps(self.schema, indent=2, separators=(',', ': ')) +
')";', file=f)
for k, v in self.config_decls.items():
with write_file(self.DESTS[k]) as f:
print(BANNER, file=f)
for i in v:
print(i, file=f)
with write_file(self.DESTS['json_decl']) as f:
print(BANNER, file=f)
for i in self.json_decls:
print(i, file=f)
with write_file(self.DESTS['json_init']) as f:
print(BANNER, file=f)
for i in self.json_init:
print(i, file=f)
# Update hashes last to ensure that this will be rerun in the
# event of a failure.
self.update_hashes()
# DON'T ADD CODE TO generate AFTER update_hashes
def handle_trivial(self, i, identifier, cfg, prefix, kind, v):
# A "trivial" option is one whose handler does nothing other
# than to call the config method with the same name (switched
# to camelCase).
decl_arg = 1
decl_arg_optional = False
if kind == 'bare':
decl_arg = 0
self.init.append(f'this->ap.addBare("{i}", '
f'[this](){{{cfg}->{identifier}();}});')
elif kind == 'required_parameter':
self.init.append(
f'this->ap.addRequiredParameter("{i}", '
f'[this](std::string const& x){{{cfg}->{identifier}(x);}}'
f', "{v}");')
elif kind == 'optional_parameter':
decl_arg_optional = True
self.init.append(
f'this->ap.addOptionalParameter("{i}", '
f'[this](std::string const& x){{{cfg}->{identifier}(x);}});')
elif kind == 'required_choices':
self.init.append(
f'this->ap.addChoices("{i}", '
f'[this](std::string const& x){{{cfg}->{identifier}(x);}}'
f', true, {v}_choices);')
elif kind == 'optional_choices':
decl_arg_optional = True
self.init.append(
f'this->ap.addChoices("{i}", '
f'[this](std::string const& x){{{cfg}->{identifier}(x);}}'
f', false, {v}_choices);')
# Generate declarations for config methods separately by
# config object.
config_prefix = prefix + 'Config'
arg = ''
if decl_arg:
arg = 'std::string const& parameter'
fn = f'{config_prefix}* {identifier}({arg})'
if fn not in self.declared_configs:
self.declared_configs.add(fn)
self.config_decls[cfg].append(f'QPDF_DLL {fn};')
if decl_arg_optional:
# Rather than making the parameter optional, add an
# overloaded method that takes no arguments. This
# strategy enables us to change an option from bare to
# optional_parameter or optional_choices without
# breaking binary compatibility. The overloaded
# methods both have to be implemented manually. They
# are not automatically called, so if you forget,
# someone will get a link error if they try to call
# one.
self.config_decls[cfg].append(
f'QPDF_DLL {config_prefix}* {identifier}();')
def handle_flag(self, i, identifier, kind, v):
# For flags that require manual handlers, declare the handler
# and register it. They have to be implemented manually in
# QPDFJob_argv.cc. You get compiler/linker errors for any
# missing methods.
if kind == 'bare':
self.decls.append(f'void {identifier}();')
self.init.append(f'this->ap.addBare("{i}", '
f'b(&ArgParser::{identifier}));')
elif kind == 'required_parameter':
self.decls.append(f'void {identifier}(std::string const&);')
self.init.append(f'this->ap.addRequiredParameter("{i}", '
f'p(&ArgParser::{identifier})'
f', "{v}");')
elif kind == 'optional_parameter':
self.decls.append(f'void {identifier}(std::string const&);')
self.init.append(f'this->ap.addOptionalParameter("{i}", '
f'p(&ArgParser::{identifier}));')
elif kind == 'required_choices':
self.decls.append(f'void {identifier}(std::string const&);')
self.init.append(f'this->ap.addChoices("{i}", '
f'p(&ArgParser::{identifier})'
f', true, {v}_choices);')
elif kind == 'optional_choices':
self.decls.append(f'void {identifier}(std::string const&);')
self.init.append(f'this->ap.addChoices("{i}", '
f'p(&ArgParser::{identifier})'
f', false, {v}_choices);')
def prepare(self, data):
self.decls = [] # argv handler declarations
self.init = [] # initialize arg parsing code
self.json_decls = [] # json handler declarations
self.json_init = [] # initialize json handlers
self.jdata = {} # running data used for json generate
self.by_table = {} # table information by name for easy lookup
def add_jdata(flag, table, details):
# Keep track of each flag and where it appears so we can
# check consistency between the json information and the
# options section.
nonlocal self
if table == 'help':
self.help_options.add(f'--{flag}')
elif flag in self.jdata:
self.jdata[flag]['tables'][table] = details
else:
self.jdata[flag] = {
'tables': {table: details},
}
# helper functions
self.init.append('auto b = [this](void (ArgParser::*f)()) {')
self.init.append(' return QPDFArgParser::bindBare(f, this);')
self.init.append('};')
self.init.append(
'auto p = [this](void (ArgParser::*f)(std::string const&)) {')
self.init.append(' return QPDFArgParser::bindParam(f, this);')
self.init.append('};')
self.init.append('')
# static variables for each set of choices for choices options
for k, v in data['choices'].items():
s = f'static char const* {k}_choices[] = {{'
for i in v:
s += f'"{i}", '
s += '0};'
self.init.append(s)
self.json_init.append(s)
self.init.append('')
self.json_init.append('')
# constants for the table names to reduce hard-coding strings
# in the handlers
for o in data['options']:
table = o['table']
if table in ('main', 'help'):
continue
i = self.to_identifier(table, 'O', True)
self.decls.append(f'static constexpr char const* {i} = "{table}";')
self.decls.append('')
# Walk through all the options adding declarations for the
# option handlers and initialization code to register the
# handlers in QPDFArgParser. For "trivial" cases,
# QPDFArgParser will call the corresponding config method
# automatically. Otherwise, it will declare a handler that you
# have to explicitly implement.
# If you add a new option table, you have to set config to the
# name of a member variable that you declare in the ArgParser
# class in QPDFJob_argv.cc. Then there should be an option in
# the main table, also listed as manual in job.yml, that
# switches to it. See implementations of any of the existing
# options that do this for examples.
for o in data['options']:
table = o['table']
config = o.get('config', None)
table_prefix = o.get('prefix', '')
arg_prefix = 'arg' + table_prefix
config_prefix = o.get('config_prefix', table_prefix)
manual = o.get('manual', [])
json_prefix = table_prefix or table
self.by_table[json_prefix] = {
'config': config,
'manual': manual,
}
if table == 'main':
self.init.append('this->ap.selectMainOptionTable();')
elif table == 'help':
self.init.append('this->ap.selectHelpOptionTable();')
else:
identifier = self.to_identifier(table, 'argEnd', False)
self.init.append(f'this->ap.registerOptionTable("{table}",'
f' b(&ArgParser::{identifier}));')
if o.get('positional', False):
self.decls.append(
f'void {arg_prefix}Positional(std::string const&);')
self.init.append('this->ap.addPositional('
f'p(&ArgParser::{arg_prefix}Positional));')
flags = {}
for i in o.get('bare', []):
flags[i] = ['bare', None]
for i, v in o.get('required_parameter', {}).items():
flags[i] = ['required_parameter', v]
for i in o.get('optional_parameter', []):
flags[i] = ['optional_parameter', None]
for i, v in o.get('required_choices', {}).items():
flags[i] = ['required_choices', v]
for i, v in o.get('optional_choices', {}).items():
flags[i] = ['optional_choices', v]
self.options_without_help.add(f'--{i}')
for i, [kind, v] in flags.items():
self.options_without_help.add(f'--{i}')
add_jdata(i, json_prefix, [kind, v])
if config is None or i in manual:
identifier = self.to_identifier(i, arg_prefix, False)
self.handle_flag(i, identifier, kind, v)
else:
identifier = self.to_identifier(i, '', False)
self.handle_trivial(
i, identifier, config, config_prefix, kind, v)
# Subsidiary options tables need end methods to do any
# final checking within the option table. Final checking
# for the main option table is handled by
# checkConfiguration, which is called explicitly in the
# QPDFJob code.
if table not in ('main', 'help'):
identifier = self.to_identifier(table, 'argEnd', False)
self.decls.append(f'void {identifier}();')
def handle_json_trivial(self, flag_key, fdata):
config = None
for t, [kind, v] in fdata['tables'].items():
# We have determined that all tables, if multiple, have
# the same config.
tdata = self.by_table[t]
config = tdata['config']
if kind == 'bare':
self.json_init.append(
f'addBare([this]() {{ {config}->{flag_key}(); }});')
elif kind == 'required_parameter' or kind == 'optional_parameter':
# Optional parameters end up just being the empty string,
# so the handler has to deal with it. The empty string is
# also allowed for non-optional.
self.json_init.append(
f'addParameter([this](std::string const& p)'
f' {{ {config}->{flag_key}(p); }});')
elif kind == 'required_choices':
self.json_init.append(
f'addChoices({v}_choices, true,'
f' [this](std::string const& p)'
f' {{ {config}->{flag_key}(p); }});')
elif kind == 'optional_choices':
self.json_init.append(
f'addChoices({v}_choices, false,'
f' [this](std::string const& p)'
f' {{ {config}->{flag_key}(p); }});')
def handle_json_manual(self, path):
method = re.sub(r'\.([a-zA-Z0-9])',
lambda x: x.group(1).upper(),
f'setup{path}')
self.json_decls.append(f'void {method}();')
self.json_init.append(f'{method}();')
def option_to_json_key(self, s):
return self.to_identifier(s, '', False)
def flag_to_schema_key(self, k):
if k.startswith('_'):
schema_key = k[1:]
else:
schema_key = re.sub(r'[^\.]+\.', '', k)
return self.option_to_json_key(schema_key)
def build_schema(self, j, path, flag, expected, options_seen):
# j: the part of data from "json" in job.yml as we traverse it
# path: a string representation of the path in the json
# flag: the command-line flag
# expected: a map of command-line options we expect to eventually see
# options_seen: which options we have seen so far
# As described in job.yml, the json can have keys that don't
# map to options. This includes keys whose values are
# dictionaries as well as keys that correspond to positional
# arguments. These start with _ and get their help from
# job.yml. Things that correspond to options get their help
# from the help text we gathered from cli.rst.
if flag in expected:
options_seen.add(flag)
elif isinstance(j, str):
if not flag.startswith('_'):
raise Exception(f'json: {flag} has a description'
' but doesn\'t start with _')
elif not (flag == '' or flag.startswith('_')):
raise Exception(f'json: unknown key {flag}')
# The logic here is subtle and makes sense if you understand
# how our JSON schemas work. They are described in JSON.hh,
# but basically, if you see a dictionary, the schema should
# have a dictionary with the same keys whose values are
# descriptive. If you see an array, the array should have
# single member that describes each element of the array. See
# JSON.hh for details.
# See comments in QPDFJob_json.cc in the Handlers class
# declaration to understand how and why the methods called
# here work. The idea is that Handlers keeps a stack of
# JSONHandler shared pointers so that we can register our
# handlers in the right place as we go.
if isinstance(j, dict):
schema_value = {}
if flag:
identifier = self.to_identifier(path, '', False)
self.json_decls.append(f'void begin{identifier}(JSON);')
self.json_decls.append(f'void end{identifier}();')
self.json_init.append(
f'beginDict(bindJSON(&Handlers::begin{identifier}),'
f' bindBare(&Handlers::end{identifier})); // {path}')
for k, v in j.items():
schema_key = self.flag_to_schema_key(k)
subpath = f'{path}.{schema_key}'
self.json_init.append(f'pushKey("{schema_key}");')
schema_value[schema_key] = self.build_schema(
v, subpath, k, expected, options_seen)
self.json_init.append(f'popHandler(); // key: {schema_key}')
elif isinstance(j, list):
if len(j) != 1:
raise Exception('json contains array with length != 1')
identifier = self.to_identifier(path, '', False)
self.json_decls.append(f'void begin{identifier}Array(JSON);')
self.json_decls.append(f'void end{identifier}Array();')
self.json_init.append(
f'beginArray(bindJSON(&Handlers::begin{identifier}Array),'
f' bindBare(&Handlers::end{identifier}Array));'
f' // {path}[]')
schema_value = [
self.build_schema(j[0], path, flag,
expected, options_seen)
]
self.json_init.append(
f'popHandler(); // array: {path}[]')
else:
schema_value = j
if schema_value is None:
schema_value = re.sub(
r'--([^\s=]+)',
lambda x: self.option_to_json_key(x.group(1)),
expected[flag]['help'])
is_trivial = False
if flag in expected:
is_trivial = True
common_config = None
for t in expected[flag]['tables']:
tdata = self.by_table[t]
if flag in tdata['manual']:
is_trivial = False
if common_config is None:
common_config = tdata['config']
elif common_config != tdata['config']:
is_trivial = False
config_key = self.flag_to_schema_key(flag)
if is_trivial:
self.handle_json_trivial(config_key, expected[flag])
else:
self.handle_json_manual(path)
return schema_value
def generate_schema(self, data):
# Check to make sure that every command-line option is
# represented in data['json']. Build a list of options that we
# expect. If an option appears once, we just expect to see it
# once. If it appears in more than one options table, we need
# to see a separate version of it for each option table. It is
# represented in job.yml prepended with the table prefix. The
# table prefix is removed in the schema. Example: "password"
# appears multiple times, so the json section of job.yml has
# main.password, uo.password, etc. But most options appear
# only once, so we can just list them as they are. There is a
# nearly exact match between option tables and dictionary in
# the job json schema, but it's not perfect because of how
# positional arguments are handled, so we have to do this
# extra work. Information about which tables a particular
# option appeared in is gathered up in prepare().
expected = {}
for k, v in self.jdata.items():
tables = v['tables']
if len(tables) == 1:
expected[k] = {**v}
else:
for t in sorted(tables):
expected[f'{t}.{k}'] = {**v}
options_seen = set()
# Walk through the json information building the schema as we
# go. This verifies consistency between command-line options
# and the json section of the data and builds up a schema by
# populating with help information as available. In addition
# to generating the schema, we declare and register json
# handlers that correspond with it. That way, we can first
# check a job JSON file against the schema, and if it matches,
# we have fewer error opportunities while calling handlers.
self.schema = self.build_schema(
data['json'], '', '', expected, options_seen)
if options_seen != set(expected.keys()):
raise Exception('missing from json: ' +
str(set(expected.keys()) - options_seen))
def check_keys(self, what, d, exp):
if not isinstance(d, dict):
exit(f'{what} is not a dictionary')
actual = set(d.keys())
extra = actual - exp
if extra:
exit(f'{what}: unknown keys = {extra}')
def validate(self, data):
self.check_keys('top', data, set(
['choices', 'options', 'json']))
for o in data['options']:
self.check_keys('top', o, set(
['table', 'prefix', 'config', 'config_prefix',
'manual', 'bare', 'positional',
'optional_parameter', 'required_parameter',
'required_choices', 'optional_choices']))
def to_identifier(self, label, prefix, const):
identifier = re.sub(r'[^a-zA-Z0-9]', '_', label)
if const:
identifier = f'{prefix}_{identifier.upper()}'
else:
if prefix:
identifier = f'{prefix}_{identifier}'
identifier = re.sub(r'_([a-z])',
lambda x: x.group(1).upper(),
identifier).replace('_', '')
return identifier
if __name__ == '__main__':
try:
os.chdir(os.path.dirname(os.path.realpath(__file__)))
Main().main()
except KeyboardInterrupt:
exit(130)