mirror of
https://github.com/frappe/frappe.git
synced 2024-06-12 16:42:20 +00:00
refactor(treewide): enable RUF
rules
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
parent
718b5b8bee
commit
3f1e19de85
14
.github/helper/ci.py
vendored
14
.github/helper/ci.py
vendored
|
@ -34,19 +34,7 @@ TESTED_VIA_CLI = [
|
|||
"*/frappe/database/**/setup_db.py",
|
||||
]
|
||||
|
||||
FRAPPE_EXCLUSIONS = [
|
||||
"*/tests/*",
|
||||
"*/commands/*",
|
||||
"*/frappe/change_log/*",
|
||||
"*/frappe/exceptions*",
|
||||
"*/frappe/desk/page/setup_wizard/setup_wizard.py",
|
||||
"*/frappe/coverage.py",
|
||||
"*frappe/setup.py",
|
||||
"*/frappe/hooks.py",
|
||||
"*/doctype/*/*_dashboard.py",
|
||||
"*/patches/*",
|
||||
"*/.github/helper/ci.py",
|
||||
] + TESTED_VIA_CLI
|
||||
FRAPPE_EXCLUSIONS = ["*/tests/*", "*/commands/*", "*/frappe/change_log/*", "*/frappe/exceptions*", "*/frappe/desk/page/setup_wizard/setup_wizard.py", "*/frappe/coverage.py", "*frappe/setup.py", "*/frappe/hooks.py", "*/doctype/*/*_dashboard.py", "*/patches/*", "*/.github/helper/ci.py", *TESTED_VIA_CLI]
|
||||
|
||||
|
||||
def get_bench_path():
|
||||
|
|
|
@ -550,7 +550,7 @@ def msgprint(
|
|||
as_list: bool = False,
|
||||
indicator: Literal["blue", "green", "orange", "red", "yellow"] | None = None,
|
||||
alert: bool = False,
|
||||
primary_action: str = None,
|
||||
primary_action: str | None = None,
|
||||
is_minimizable: bool = False,
|
||||
wide: bool = False,
|
||||
*,
|
||||
|
@ -1654,7 +1654,9 @@ def _load_app_hooks(app_name: str | None = None):
|
|||
return hooks
|
||||
|
||||
|
||||
def get_hooks(hook: str = None, default: Any | None = "_KEEP_DEFAULT_LIST", app_name: str = None) -> _dict:
|
||||
def get_hooks(
|
||||
hook: str | None = None, default: Any | None = "_KEEP_DEFAULT_LIST", app_name: str | None = None
|
||||
) -> _dict:
|
||||
"""Get hooks via `app/hooks.py`
|
||||
|
||||
:param hook: Name of the hook. Will gather all hooks for this name and return as a list.
|
||||
|
@ -1900,7 +1902,7 @@ def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
|
|||
|
||||
newdoc = get_doc(copy.deepcopy(d))
|
||||
newdoc.set("__islocal", 1)
|
||||
for fieldname in fields_to_clear + ["amended_from", "amendment_date"]:
|
||||
for fieldname in [*fields_to_clear, "amended_from", "amendment_date"]:
|
||||
newdoc.set(fieldname, None)
|
||||
|
||||
if not ignore_no_copy:
|
||||
|
@ -2525,7 +2527,7 @@ def validate_and_sanitize_search_inputs(fn):
|
|||
return wrapper
|
||||
|
||||
|
||||
from frappe.utils.error import log_error # noqa
|
||||
from frappe.utils.error import log_error
|
||||
|
||||
if _tune_gc:
|
||||
# generational GC gets triggered after certain allocs (g0) which is 700 by default.
|
||||
|
|
|
@ -22,7 +22,7 @@ import frappe.rate_limiter
|
|||
import frappe.recorder
|
||||
import frappe.utils.response
|
||||
from frappe import _
|
||||
from frappe.auth import SAFE_HTTP_METHODS, UNSAFE_HTTP_METHODS, HTTPRequest, validate_auth # noqa
|
||||
from frappe.auth import SAFE_HTTP_METHODS, UNSAFE_HTTP_METHODS, HTTPRequest, validate_auth
|
||||
from frappe.middlewares import StaticDataMiddleware
|
||||
from frappe.utils import CallbackManager, cint, get_site_name
|
||||
from frappe.utils.data import escape_html
|
||||
|
|
|
@ -223,7 +223,7 @@ class LoginManager:
|
|||
|
||||
clear_sessions(frappe.session.user, keep_current=True)
|
||||
|
||||
def authenticate(self, user: str = None, pwd: str = None):
|
||||
def authenticate(self, user: str | None = None, pwd: str | None = None):
|
||||
from frappe.core.doctype.user.user import User
|
||||
|
||||
if not (user and pwd):
|
||||
|
@ -483,7 +483,7 @@ class LoginAttemptTracker:
|
|||
max_consecutive_login_attempts: int = 3,
|
||||
lock_interval: int = 5 * 60,
|
||||
*,
|
||||
user_name: str = None,
|
||||
user_name: str | None = None,
|
||||
):
|
||||
"""Initialize the tracker.
|
||||
|
||||
|
|
|
@ -41,7 +41,8 @@ global_cache_keys = (
|
|||
"information_schema:counts",
|
||||
"db_tables",
|
||||
"server_script_autocompletion_items",
|
||||
) + doctype_map_keys
|
||||
*doctype_map_keys,
|
||||
)
|
||||
|
||||
user_cache_keys = (
|
||||
"bootinfo",
|
||||
|
@ -107,7 +108,7 @@ def clear_global_cache():
|
|||
|
||||
def clear_defaults_cache(user=None):
|
||||
if user:
|
||||
frappe.cache.hdel("defaults", [user] + common_default_keys)
|
||||
frappe.cache.hdel("defaults", [user, *common_default_keys])
|
||||
elif frappe.flags.in_install != "frappe":
|
||||
frappe.cache.delete_value("defaults")
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ def generate_pot_file(context, app: str | None = None):
|
|||
)
|
||||
@click.option("--locale", help="Compile transaltions only for this locale. eg: de")
|
||||
@pass_context
|
||||
def compile_translations(context, app: str | None = None, locale: str = None, force=False):
|
||||
def compile_translations(context, app: str | None = None, locale: str | None = None, force=False):
|
||||
from frappe.gettext.translate import compile_translations as _compile_translations
|
||||
|
||||
if not app:
|
||||
|
@ -39,7 +39,7 @@ def compile_translations(context, app: str | None = None, locale: str = None, fo
|
|||
@click.option("--app", help="Only migrate for this app. eg: frappe")
|
||||
@click.option("--locale", help="Compile translations only for this locale. eg: de")
|
||||
@pass_context
|
||||
def csv_to_po(context, app: str | None = None, locale: str = None):
|
||||
def csv_to_po(context, app: str | None = None, locale: str | None = None):
|
||||
from frappe.gettext.translate import migrate
|
||||
|
||||
if not app:
|
||||
|
|
|
@ -456,7 +456,7 @@ def install_app(context, apps, force=False):
|
|||
print(f"App {app} is Incompatible with Site {site}{err_msg}")
|
||||
exit_code = 1
|
||||
except Exception as err:
|
||||
err_msg = f": {str(err)}\n{frappe.get_traceback(with_context=True)}"
|
||||
err_msg = f": {err!s}\n{frappe.get_traceback(with_context=True)}"
|
||||
print(f"An error occurred while installing {app}{err_msg}")
|
||||
exit_code = 1
|
||||
|
||||
|
@ -1034,7 +1034,7 @@ def _drop_site(
|
|||
messages = [
|
||||
"=" * 80,
|
||||
f"Error: The operation has stopped because backup of {site}'s database failed.",
|
||||
f"Reason: {str(err)}\n",
|
||||
f"Reason: {err!s}\n",
|
||||
"Fix the issue and try again.",
|
||||
f"Hint: Use 'bench drop-site {site} --force' to force the removal of {site}",
|
||||
]
|
||||
|
|
|
@ -528,7 +528,7 @@ def _enter_console(extra_args=None):
|
|||
_("{} not found in PATH! This is required to access the console.").format(bin_name),
|
||||
exc=frappe.ExecutableNotFound,
|
||||
)
|
||||
os.execv(bin, [bin] + args)
|
||||
os.execv(bin, [bin, *args])
|
||||
|
||||
|
||||
@click.command("jupyter")
|
||||
|
|
|
@ -2,7 +2,7 @@ import frappe
|
|||
from frappe import _
|
||||
|
||||
|
||||
def get_modules_from_all_apps_for_user(user: str = None) -> list[dict]:
|
||||
def get_modules_from_all_apps_for_user(user: str | None = None) -> list[dict]:
|
||||
user = user or frappe.session.user
|
||||
all_modules = get_modules_from_all_apps()
|
||||
global_blocked_modules = frappe.get_doc("User", "Administrator").get_blocked_modules()
|
||||
|
|
|
@ -107,7 +107,7 @@ def get_reference_details(reference_doctype, doctype, reference_list, reference_
|
|||
["Dynamic Link", "link_doctype", "=", reference_doctype],
|
||||
["Dynamic Link", "link_name", "in", reference_list],
|
||||
]
|
||||
fields = ["`tabDynamic Link`.link_name"] + field_map.get(doctype, [])
|
||||
fields = ["`tabDynamic Link`.link_name", *field_map.get(doctype, [])]
|
||||
|
||||
records = frappe.get_list(doctype, filters=filters, fields=fields, as_list=True)
|
||||
temp_records = [d[1:] for d in records]
|
||||
|
|
|
@ -263,7 +263,7 @@ def add_attachments(name: str, attachments: Iterable[str | dict]) -> None:
|
|||
|
||||
|
||||
@frappe.whitelist(allow_guest=True, methods=("GET",))
|
||||
def mark_email_as_seen(name: str = None):
|
||||
def mark_email_as_seen(name: str | None = None):
|
||||
frappe.request.after_response.add(lambda: _mark_email_as_seen(name))
|
||||
frappe.response.update(frappe.utils.get_imaginary_pixel_response())
|
||||
|
||||
|
|
|
@ -274,7 +274,7 @@ def export_json(doctype, path, filters=None, or_filters=None, name=None, order_b
|
|||
for v in doc.values():
|
||||
if isinstance(v, list):
|
||||
for child in v:
|
||||
for key in del_keys + ("docstatus", "doctype", "modified", "name"):
|
||||
for key in (*del_keys, "docstatus", "doctype", "modified", "name"):
|
||||
if key in child:
|
||||
del child[key]
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ class Exporter:
|
|||
fields = [df for df in fields if is_exportable(df)]
|
||||
|
||||
if "name" in fieldnames:
|
||||
fields = [name_field] + fields
|
||||
fields = [name_field, *fields]
|
||||
|
||||
return fields or []
|
||||
|
||||
|
@ -162,7 +162,7 @@ class Exporter:
|
|||
parent_data = frappe.db.get_list(
|
||||
self.doctype,
|
||||
filters=filters,
|
||||
fields=["name"] + parent_fields,
|
||||
fields=["name", *parent_fields],
|
||||
limit_page_length=self.export_page_length,
|
||||
order_by=order_by,
|
||||
as_list=0,
|
||||
|
@ -175,9 +175,13 @@ class Exporter:
|
|||
continue
|
||||
child_table_df = self.meta.get_field(key)
|
||||
child_table_doctype = child_table_df.options
|
||||
child_fields = ["name", "idx", "parent", "parentfield"] + list(
|
||||
{format_column_name(df) for df in self.fields if df.parent == child_table_doctype}
|
||||
)
|
||||
child_fields = [
|
||||
"name",
|
||||
"idx",
|
||||
"parent",
|
||||
"parentfield",
|
||||
*list({format_column_name(df) for df in self.fields if df.parent == child_table_doctype}),
|
||||
]
|
||||
data = frappe.get_all(
|
||||
child_table_doctype,
|
||||
filters={
|
||||
|
|
|
@ -483,7 +483,7 @@ class ImportFile:
|
|||
"read_only": col.df.read_only,
|
||||
}
|
||||
|
||||
data = [[row.row_number] + row.as_list() for row in self.data]
|
||||
data = [[row.row_number, *row.as_list()] for row in self.data]
|
||||
|
||||
warnings = self.get_warnings()
|
||||
|
||||
|
|
|
@ -1245,7 +1245,7 @@ def validate_fields(meta):
|
|||
if frappe.flags.in_patch or frappe.flags.in_fixtures:
|
||||
return
|
||||
|
||||
if d.fieldtype in ("Link",) + table_fields:
|
||||
if d.fieldtype in ("Link", *table_fields):
|
||||
if not d.options:
|
||||
frappe.throw(
|
||||
_("{0}: Options required for Link or Table type field {1} in row {2}").format(
|
||||
|
|
|
@ -127,7 +127,7 @@ class DocumentNamingSettings(Document):
|
|||
self.validate_series_name(series)
|
||||
|
||||
if options and self.user_must_always_select:
|
||||
options = [""] + options
|
||||
options = ["", *options]
|
||||
|
||||
default = options[0] if options else ""
|
||||
|
||||
|
@ -246,7 +246,7 @@ class DocumentNamingSettings(Document):
|
|||
return "\n".join(NamingSeries(series).get_preview(doc=doc))
|
||||
except Exception as e:
|
||||
frappe.clear_last_message()
|
||||
return _("Failed to generate names from the series") + f"\n{str(e)}"
|
||||
return _("Failed to generate names from the series") + f"\n{e!s}"
|
||||
|
||||
def _fetch_last_doc_if_available(self):
|
||||
"""Fetch last doc for evaluating naming series with fields."""
|
||||
|
|
|
@ -813,7 +813,7 @@ def has_permission(doc, ptype=None, user=None, debug=False):
|
|||
return False
|
||||
|
||||
|
||||
def get_permission_query_conditions(user: str = None) -> str:
|
||||
def get_permission_query_conditions(user: str | None = None) -> str:
|
||||
user = user or frappe.session.user
|
||||
if user == "Administrator":
|
||||
return ""
|
||||
|
|
|
@ -169,7 +169,7 @@ def delete_file(path: str) -> None:
|
|||
os.remove(path)
|
||||
|
||||
|
||||
def remove_file_by_url(file_url: str, doctype: str = None, name: str = None) -> "Document":
|
||||
def remove_file_by_url(file_url: str, doctype: str | None = None, name: str | None = None) -> "Document":
|
||||
if doctype and name:
|
||||
fid = frappe.db.get_value(
|
||||
"File", {"file_url": file_url, "attached_to_doctype": doctype, "attached_to_name": name}
|
||||
|
@ -276,7 +276,7 @@ def extract_images_from_html(doc: "Document", content: str, is_private: bool = F
|
|||
return content
|
||||
|
||||
|
||||
def get_random_filename(content_type: str = None) -> str:
|
||||
def get_random_filename(content_type: str | None = None) -> str:
|
||||
extn = None
|
||||
if content_type:
|
||||
extn = mimetypes.guess_extension(content_type)
|
||||
|
@ -413,7 +413,7 @@ def decode_file_content(content: bytes) -> bytes:
|
|||
return safe_b64decode(content)
|
||||
|
||||
|
||||
def find_file_by_url(path: str, name: str = None) -> Optional["File"]:
|
||||
def find_file_by_url(path: str, name: str | None = None) -> Optional["File"]:
|
||||
filters = {"file_url": str(path)}
|
||||
if name:
|
||||
filters["name"] = str(name)
|
||||
|
|
|
@ -195,7 +195,7 @@ def run_scheduled_job(job_type: str):
|
|||
print(frappe.get_traceback())
|
||||
|
||||
|
||||
def sync_jobs(hooks: dict = None):
|
||||
def sync_jobs(hooks: dict | None = None):
|
||||
frappe.reload_doc("core", "doctype", "scheduled_job_type")
|
||||
scheduler_events = hooks or frappe.get_hooks("scheduler_events")
|
||||
all_events = insert_events(scheduler_events)
|
||||
|
@ -232,7 +232,7 @@ def insert_event_jobs(events: list, event_type: str) -> list:
|
|||
return event_jobs
|
||||
|
||||
|
||||
def insert_single_event(frequency: str, event: str, cron_format: str = None):
|
||||
def insert_single_event(frequency: str, event: str, cron_format: str | None = None):
|
||||
cron_expr = {"cron_format": cron_format} if cron_format else {}
|
||||
|
||||
try:
|
||||
|
|
|
@ -843,7 +843,7 @@ def get_perm_info(role):
|
|||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
def update_password(
|
||||
new_password: str, logout_all_sessions: int = 0, key: str = None, old_password: str = None
|
||||
new_password: str, logout_all_sessions: int = 0, key: str | None = None, old_password: str | None = None
|
||||
):
|
||||
"""Update password for the current user.
|
||||
|
||||
|
@ -1214,7 +1214,7 @@ def handle_password_test_fail(feedback: dict):
|
|||
suggestions = feedback.get("suggestions", [])
|
||||
warning = feedback.get("warning", "")
|
||||
|
||||
frappe.throw(msg=" ".join([warning] + suggestions), title=_("Invalid Password"))
|
||||
frappe.throw(msg=" ".join([warning, *suggestions]), title=_("Invalid Password"))
|
||||
|
||||
|
||||
def update_gravatar(name):
|
||||
|
|
|
@ -42,7 +42,8 @@ FRAPPE_EXCLUSIONS = [
|
|||
"*frappe/setup.py",
|
||||
"*/doctype/*/*_dashboard.py",
|
||||
"*/patches/*",
|
||||
] + TESTED_VIA_CLI
|
||||
*TESTED_VIA_CLI,
|
||||
]
|
||||
|
||||
|
||||
class CodeCoverage:
|
||||
|
|
|
@ -10,7 +10,7 @@ import traceback
|
|||
from collections.abc import Iterable, Sequence
|
||||
from contextlib import contextmanager, suppress
|
||||
from time import time
|
||||
from typing import TYPE_CHECKING, Any, Union
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from pypika.dialects import MySQLQueryBuilder, PostgreSQLQueryBuilder
|
||||
|
||||
|
@ -57,10 +57,10 @@ class Database:
|
|||
VARCHAR_LEN = 140
|
||||
MAX_COLUMN_LENGTH = 64
|
||||
|
||||
OPTIONAL_COLUMNS = ["_user_tags", "_comments", "_assign", "_liked_by"]
|
||||
DEFAULT_SHORTCUTS = ["_Login", "__user", "_Full Name", "Today", "__today", "now", "Now"]
|
||||
OPTIONAL_COLUMNS = ("_user_tags", "_comments", "_assign", "_liked_by")
|
||||
DEFAULT_SHORTCUTS = ("_Login", "__user", "_Full Name", "Today", "__today", "now", "Now")
|
||||
STANDARD_VARCHAR_COLUMNS = ("name", "owner", "modified_by")
|
||||
DEFAULT_COLUMNS = ["name", "creation", "modified", "modified_by", "owner", "docstatus", "idx"]
|
||||
DEFAULT_COLUMNS = ("name", "creation", "modified", "modified_by", "owner", "docstatus", "idx")
|
||||
CHILD_TABLE_COLUMNS = ("parent", "parenttype", "parentfield")
|
||||
MAX_WRITES_PER_TRANSACTION = 200_000
|
||||
|
||||
|
@ -1131,7 +1131,7 @@ class Database:
|
|||
return getdate(date).strftime("%Y-%m-%d")
|
||||
|
||||
@staticmethod
|
||||
def format_datetime(datetime): # noqa: F811
|
||||
def format_datetime(datetime):
|
||||
if not datetime:
|
||||
return FallBackDateTimeStr
|
||||
|
||||
|
@ -1234,7 +1234,7 @@ class Database:
|
|||
query = sql_dict.get(current_dialect)
|
||||
return self.sql(query, values, **kwargs)
|
||||
|
||||
def delete(self, doctype: str, filters: dict | list = None, debug=False, **kwargs):
|
||||
def delete(self, doctype: str, filters: dict | list | None = None, debug=False, **kwargs):
|
||||
"""Delete rows from a table in site which match the passed filters. This
|
||||
does trigger DocType hooks. Simply runs a DELETE query in the database.
|
||||
|
||||
|
|
|
@ -393,7 +393,7 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
|
|||
if not clustered_index:
|
||||
return index
|
||||
|
||||
def add_index(self, doctype: str, fields: list, index_name: str = None):
|
||||
def add_index(self, doctype: str, fields: list, index_name: str | None = None):
|
||||
"""Creates an index with given fields if not already created.
|
||||
Index name will be `fieldname1_fieldname2_index`"""
|
||||
index_name = index_name or self.get_index_name(fields)
|
||||
|
|
|
@ -342,7 +342,7 @@ class PostgresDatabase(PostgresExceptionUtil, Database):
|
|||
and indexname='{index_name}' limit 1"""
|
||||
)
|
||||
|
||||
def add_index(self, doctype: str, fields: list, index_name: str = None):
|
||||
def add_index(self, doctype: str, fields: list, index_name: str | None = None):
|
||||
"""Creates an index with given fields if not already created.
|
||||
Index name will be `fieldname1_fieldname2_index`"""
|
||||
table_name = get_table_name(doctype)
|
||||
|
|
|
@ -504,7 +504,7 @@ class ChildQuery:
|
|||
}
|
||||
return frappe.qb.get_query(
|
||||
self.doctype,
|
||||
fields=self.fields + ["parent", "parentfield"],
|
||||
fields=[*self.fields, "parent", "parentfield"],
|
||||
filters=filters,
|
||||
order_by="idx asc",
|
||||
)
|
||||
|
|
|
@ -49,7 +49,7 @@ class DBTable:
|
|||
pass
|
||||
|
||||
def get_column_definitions(self):
|
||||
column_list = [] + frappe.db.DEFAULT_COLUMNS
|
||||
column_list = [*frappe.db.DEFAULT_COLUMNS]
|
||||
ret = []
|
||||
for k in list(self.columns):
|
||||
if k not in column_list:
|
||||
|
|
|
@ -419,7 +419,7 @@ def get_workspace_sidebar_items():
|
|||
blocked_modules.append("Dummy Module")
|
||||
|
||||
# adding None to allowed_domains to include pages without domain restriction
|
||||
allowed_domains = [None] + frappe.get_active_domains()
|
||||
allowed_domains = [None, *frappe.get_active_domains()]
|
||||
|
||||
filters = {
|
||||
"restrict_to_domain": ["in", allowed_domains],
|
||||
|
|
|
@ -134,7 +134,7 @@ class SubmittableDocumentTree:
|
|||
|
||||
def get_document_sources(self):
|
||||
"""Return list of doctypes from where we access submittable documents."""
|
||||
return list(set(self.get_link_sources() + [self.root_doctype]))
|
||||
return list(set([*self.get_link_sources(), self.root_doctype]))
|
||||
|
||||
def get_link_sources(self):
|
||||
"""limit doctype links to these doctypes."""
|
||||
|
@ -149,15 +149,15 @@ class SubmittableDocumentTree:
|
|||
return self._submittable_doctypes
|
||||
|
||||
|
||||
def get_child_tables_of_doctypes(doctypes: list[str] = None):
|
||||
def get_child_tables_of_doctypes(doctypes: list[str] | None = None):
|
||||
"""Return child tables by doctype."""
|
||||
filters = [["fieldtype", "=", "Table"]]
|
||||
filters_for_docfield = filters
|
||||
filters_for_customfield = filters
|
||||
|
||||
if doctypes:
|
||||
filters_for_docfield = filters + [["parent", "in", tuple(doctypes)]]
|
||||
filters_for_customfield = filters + [["dt", "in", tuple(doctypes)]]
|
||||
filters_for_docfield = [*filters, ["parent", "in", tuple(doctypes)]]
|
||||
filters_for_customfield = [*filters, ["dt", "in", tuple(doctypes)]]
|
||||
|
||||
links = frappe.get_all(
|
||||
"DocField",
|
||||
|
@ -184,7 +184,7 @@ def get_child_tables_of_doctypes(doctypes: list[str] = None):
|
|||
|
||||
|
||||
def get_references_across_doctypes(
|
||||
to_doctypes: list[str] = None, limit_link_doctypes: list[str] = None
|
||||
to_doctypes: list[str] | None = None, limit_link_doctypes: list[str] | None = None
|
||||
) -> list:
|
||||
"""Find doctype wise foreign key references.
|
||||
|
||||
|
@ -221,7 +221,7 @@ def get_references_across_doctypes(
|
|||
|
||||
|
||||
def get_references_across_doctypes_by_link_field(
|
||||
to_doctypes: list[str] = None, limit_link_doctypes: list[str] = None
|
||||
to_doctypes: list[str] | None = None, limit_link_doctypes: list[str] | None = None
|
||||
):
|
||||
"""Find doctype wise foreign key references based on link fields.
|
||||
|
||||
|
@ -261,7 +261,7 @@ def get_references_across_doctypes_by_link_field(
|
|||
|
||||
|
||||
def get_references_across_doctypes_by_dynamic_link_field(
|
||||
to_doctypes: list[str] = None, limit_link_doctypes: list[str] = None
|
||||
to_doctypes: list[str] | None = None, limit_link_doctypes: list[str] | None = None
|
||||
):
|
||||
"""Find doctype wise foreign key references based on dynamic link fields.
|
||||
|
||||
|
@ -315,7 +315,7 @@ def get_referencing_documents(
|
|||
reference_names: list[str],
|
||||
link_info: dict,
|
||||
get_parent_if_child_table_doc: bool = True,
|
||||
parent_filters: list[list] = None,
|
||||
parent_filters: list[list] | None = None,
|
||||
child_filters=None,
|
||||
allowed_parents=None,
|
||||
):
|
||||
|
@ -439,7 +439,7 @@ def get_linked_docs(doctype: str, name: str, linkinfo: dict | None = None) -> di
|
|||
"fields",
|
||||
{
|
||||
"in_list_view": 1,
|
||||
"fieldtype": ["not in", ("Image", "HTML", "Button") + frappe.model.table_fields],
|
||||
"fieldtype": ["not in", ("Image", "HTML", "Button", *frappe.model.table_fields)],
|
||||
},
|
||||
)
|
||||
] + ["name", "modified", "docstatus"]
|
||||
|
|
|
@ -291,7 +291,7 @@ def get_prepared_report_result(report, filters, dn="", user=None):
|
|||
report_data = get_report_data(doc, data)
|
||||
except Exception as e:
|
||||
doc.log_error("Prepared report render failed")
|
||||
frappe.msgprint(_("Prepared report render failed") + f": {str(e)}")
|
||||
frappe.msgprint(_("Prepared report render failed") + f": {e!s}")
|
||||
doc = None
|
||||
|
||||
return report_data | {"prepared_report": True, "doc": doc}
|
||||
|
|
|
@ -360,8 +360,8 @@ def export_query():
|
|||
if add_totals_row:
|
||||
ret = append_totals_row(ret)
|
||||
|
||||
data = [[_("Sr")] + get_labels(db_query.fields, doctype)]
|
||||
data.extend([i + 1] + list(row) for i, row in enumerate(ret))
|
||||
data = [[_("Sr"), *get_labels(db_query.fields, doctype)]]
|
||||
data.extend([i + 1, *list(row)] for i, row in enumerate(ret))
|
||||
data = handle_duration_fieldtype_values(doctype, data, db_query.fields)
|
||||
|
||||
if file_format_type == "CSV":
|
||||
|
@ -543,7 +543,7 @@ def get_stats(stats, doctype, filters=None):
|
|||
tag_count = frappe.get_list(
|
||||
doctype,
|
||||
fields=[column, "count(*)"],
|
||||
filters=filters + [[column, "!=", ""]],
|
||||
filters=[*filters, [column, "!=", ""]],
|
||||
group_by=column,
|
||||
as_list=True,
|
||||
distinct=1,
|
||||
|
@ -554,7 +554,7 @@ def get_stats(stats, doctype, filters=None):
|
|||
no_tag_count = frappe.get_list(
|
||||
doctype,
|
||||
fields=[column, "count(*)"],
|
||||
filters=filters + [[column, "in", ("", ",")]],
|
||||
filters=[*filters, [column, "in", ("", ",")]],
|
||||
as_list=True,
|
||||
group_by=column,
|
||||
order_by=column,
|
||||
|
@ -593,7 +593,7 @@ def get_filter_dashboard_data(stats, doctype, filters=None):
|
|||
tagcount = frappe.get_list(
|
||||
doctype,
|
||||
fields=[tag["name"], "count(*)"],
|
||||
filters=filters + ["ifnull(`%s`,'')!=''" % tag["name"]],
|
||||
filters=[*filters, "ifnull(`%s`,'')!=''" % tag["name"]],
|
||||
group_by=tag["name"],
|
||||
as_list=True,
|
||||
)
|
||||
|
@ -615,7 +615,7 @@ def get_filter_dashboard_data(stats, doctype, filters=None):
|
|||
frappe.get_list(
|
||||
doctype,
|
||||
fields=[tag["name"], "count(*)"],
|
||||
filters=filters + ["({0} = '' or {0} is null)".format(tag["name"])],
|
||||
filters=[*filters, "({0} = '' or {0} is null)".format(tag["name"])],
|
||||
as_list=True,
|
||||
)[0][1],
|
||||
]
|
||||
|
|
|
@ -63,7 +63,7 @@ def search_widget(
|
|||
doctype: str,
|
||||
txt: str,
|
||||
query: str | None = None,
|
||||
searchfield: str = None,
|
||||
searchfield: str | None = None,
|
||||
start: int = 0,
|
||||
page_length: int = 10,
|
||||
filters: str | None | dict | list = None,
|
||||
|
|
|
@ -19,7 +19,7 @@ def get_contact_list(txt, page_length=20) -> list[dict]:
|
|||
fields = ["first_name", "middle_name", "last_name", "company_name"]
|
||||
contacts = frappe.get_list(
|
||||
"Contact",
|
||||
fields=fields + ["`tabContact Email`.email_id"],
|
||||
fields=[*fields, "`tabContact Email`.email_id"],
|
||||
filters=[
|
||||
["Contact Email", "email_id", "is", "set"],
|
||||
],
|
||||
|
|
|
@ -34,14 +34,14 @@ def cache_email_account(cache_name):
|
|||
setattr(frappe.local, cache_name, {})
|
||||
|
||||
cached_accounts = getattr(frappe.local, cache_name)
|
||||
match_by = list(kwargs.values()) + ["default"]
|
||||
match_by = [*list(kwargs.values()), "default"]
|
||||
matched_accounts = list(filter(None, [cached_accounts.get(key) for key in match_by]))
|
||||
if matched_accounts:
|
||||
return matched_accounts[0]
|
||||
|
||||
matched_accounts = func(*args, **kwargs)
|
||||
cached_accounts.update(matched_accounts or {})
|
||||
return matched_accounts and list(matched_accounts.values())[0]
|
||||
return matched_accounts and next(iter(matched_accounts.values()))
|
||||
|
||||
return wrapper_cache_email_account
|
||||
|
||||
|
|
|
@ -37,11 +37,11 @@ class EmailGroup(Document):
|
|||
def import_from(self, doctype):
|
||||
"""Extract Email Addresses from given doctype and add them to the current list"""
|
||||
meta = frappe.get_meta(doctype)
|
||||
email_field = [
|
||||
email_field = next(
|
||||
d.fieldname
|
||||
for d in meta.fields
|
||||
if d.fieldtype in ("Data", "Small Text", "Text", "Code") and d.options == "Email"
|
||||
][0]
|
||||
)
|
||||
unsubscribed_field = "unsubscribed" if meta.get_field("unsubscribed") else None
|
||||
added = 0
|
||||
|
||||
|
|
|
@ -726,7 +726,7 @@ class QueueBuilder:
|
|||
# This re-uses smtp server instance to minimize the cost of new session creation
|
||||
smtp_server_instance = None
|
||||
for r in final_recipients:
|
||||
recipients = list(set([r] + self.final_cc() + self.bcc))
|
||||
recipients = list(set([r, *self.final_cc(), *self.bcc]))
|
||||
q = EmailQueue.new({**queue_data, **{"recipients": recipients}}, ignore_permissions=True)
|
||||
if not smtp_server_instance:
|
||||
email_account = q.get_email_account(raise_error=True)
|
||||
|
|
|
@ -668,7 +668,7 @@ class InboundMail(Email):
|
|||
# replace inline images
|
||||
content = self.content
|
||||
for file in attachments:
|
||||
if file.name in self.cid_map and self.cid_map[file.name]:
|
||||
if self.cid_map.get(file.name):
|
||||
content = content.replace(f"cid:{self.cid_map[file.name]}", file.unique_url)
|
||||
return content
|
||||
|
||||
|
|
|
@ -243,7 +243,7 @@ def upload_file():
|
|||
).save(ignore_permissions=ignore_permissions)
|
||||
|
||||
|
||||
def check_write_permission(doctype: str = None, name: str = None):
|
||||
def check_write_permission(doctype: str | None = None, name: str | None = None):
|
||||
check_doctype = doctype and not name
|
||||
if doctype and name:
|
||||
try:
|
||||
|
|
|
@ -231,7 +231,7 @@ def fetch_details_from_tag(_tag: str) -> tuple[str, str, str]:
|
|||
try:
|
||||
repo, tag = app_tag
|
||||
except ValueError:
|
||||
repo, tag = app_tag + [None]
|
||||
repo, tag = [*app_tag, None]
|
||||
|
||||
try:
|
||||
org, repo = org_repo
|
||||
|
|
|
@ -183,7 +183,7 @@ class LDAPSettings(Document):
|
|||
setattr(user, key, value)
|
||||
user.save(ignore_permissions=True)
|
||||
|
||||
def sync_roles(self, user: "User", additional_groups: list = None):
|
||||
def sync_roles(self, user: "User", additional_groups: list | None = None):
|
||||
current_roles = {d.role for d in user.get("roles")}
|
||||
if self.default_user_type == "System User":
|
||||
needed_roles = {self.default_role}
|
||||
|
@ -203,7 +203,7 @@ class LDAPSettings(Document):
|
|||
|
||||
user.remove_roles(*roles_to_remove)
|
||||
|
||||
def create_or_update_user(self, user_data: dict, groups: list = None):
|
||||
def create_or_update_user(self, user_data: dict, groups: list | None = None):
|
||||
user: "User" = None
|
||||
role: str = None
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import contextlib
|
|||
import functools
|
||||
import os
|
||||
import ssl
|
||||
import typing
|
||||
from unittest import TestCase, mock
|
||||
|
||||
import ldap3
|
||||
|
@ -18,7 +19,7 @@ class LDAP_TestCase:
|
|||
TEST_LDAP_SERVER = None # must match the 'LDAP Settings' field option
|
||||
TEST_LDAP_SEARCH_STRING = None
|
||||
LDAP_USERNAME_FIELD = None
|
||||
DOCUMENT_GROUP_MAPPINGS = []
|
||||
DOCUMENT_GROUP_MAPPINGS: typing.ClassVar[list] = []
|
||||
LDAP_SCHEMA = None
|
||||
LDAP_LDIF_JSON = None
|
||||
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING = None
|
||||
|
@ -606,7 +607,7 @@ class LDAP_TestCase:
|
|||
class Test_OpenLDAP(LDAP_TestCase, TestCase):
|
||||
TEST_LDAP_SERVER = "OpenLDAP"
|
||||
TEST_LDAP_SEARCH_STRING = "(uid={0})"
|
||||
DOCUMENT_GROUP_MAPPINGS = [
|
||||
DOCUMENT_GROUP_MAPPINGS: typing.ClassVar[list] = [
|
||||
{
|
||||
"doctype": "LDAP Group Mapping",
|
||||
"ldap_group": "Administrators",
|
||||
|
@ -619,7 +620,7 @@ class Test_OpenLDAP(LDAP_TestCase, TestCase):
|
|||
LDAP_SCHEMA = OFFLINE_SLAPD_2_4
|
||||
LDAP_LDIF_JSON = "test_data_ldif_openldap.json"
|
||||
|
||||
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING = [
|
||||
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING: typing.ClassVar[list] = [
|
||||
"(uid={0})",
|
||||
"(&(objectclass=posixaccount)(uid={0}))",
|
||||
"(&(description=*ACCESS:test1*)(uid={0}))", # OpenLDAP has no member of group, use description to filter posix.user has equivilent of AD 'memberOf'
|
||||
|
@ -630,7 +631,7 @@ class Test_OpenLDAP(LDAP_TestCase, TestCase):
|
|||
class Test_ActiveDirectory(LDAP_TestCase, TestCase):
|
||||
TEST_LDAP_SERVER = "Active Directory"
|
||||
TEST_LDAP_SEARCH_STRING = "(samaccountname={0})"
|
||||
DOCUMENT_GROUP_MAPPINGS = [
|
||||
DOCUMENT_GROUP_MAPPINGS: typing.ClassVar[list] = [
|
||||
{
|
||||
"doctype": "LDAP Group Mapping",
|
||||
"ldap_group": "Domain Administrators",
|
||||
|
@ -647,7 +648,7 @@ class Test_ActiveDirectory(LDAP_TestCase, TestCase):
|
|||
LDAP_SCHEMA = OFFLINE_AD_2012_R2
|
||||
LDAP_LDIF_JSON = "test_data_ldif_activedirectory.json"
|
||||
|
||||
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING = [
|
||||
TEST_VALUES_LDAP_COMPLEX_SEARCH_STRING: typing.ClassVar[dict] = [
|
||||
"(samaccountname={0})",
|
||||
"(&(objectclass=user)(samaccountname={0}))",
|
||||
"(&(description=*ACCESS:test1*)(samaccountname={0}))", # OpenLDAP has no member of group, use description to filter posix.user has equivilent of AD 'memberOf'
|
||||
|
|
|
@ -165,7 +165,7 @@ def is_valid_access_token(access_token: str) -> bool:
|
|||
|
||||
|
||||
@frappe.whitelist(methods=["GET"])
|
||||
def callback(state: str, code: str = None, error: str = None) -> None:
|
||||
def callback(state: str, code: str | None = None, error: str | None = None) -> None:
|
||||
"""Common callback for google integrations.
|
||||
Invokes functions using `frappe.get_attr` and also adds required (keyworded) arguments
|
||||
along with committing and redirecting us back to frappe site."""
|
||||
|
|
|
@ -616,7 +616,7 @@ class BaseDocument:
|
|||
SET {values} WHERE `name`=%s""".format(
|
||||
doctype=self.doctype, values=", ".join("`" + c + "`=%s" for c in columns)
|
||||
),
|
||||
list(d.values()) + [name],
|
||||
[*list(d.values()), name],
|
||||
)
|
||||
except Exception as e:
|
||||
if frappe.db.is_unique_key_violation(e):
|
||||
|
|
|
@ -47,7 +47,7 @@ def get_dynamic_link_map(for_delete=False):
|
|||
)
|
||||
for doctype in links:
|
||||
dynamic_link_map.setdefault(doctype, []).append(df)
|
||||
except frappe.db.TableMissingError: # noqa: E722
|
||||
except frappe.db.TableMissingError:
|
||||
pass
|
||||
|
||||
frappe.local.dynamic_link_map = dynamic_link_map
|
||||
|
|
|
@ -106,10 +106,10 @@ class Meta(Document):
|
|||
"DocType State",
|
||||
)
|
||||
)
|
||||
standard_set_once_fields = [
|
||||
standard_set_once_fields = (
|
||||
frappe._dict(fieldname="creation", fieldtype="Datetime"),
|
||||
frappe._dict(fieldname="owner", fieldtype="Data"),
|
||||
]
|
||||
)
|
||||
|
||||
def __init__(self, doctype):
|
||||
if isinstance(doctype, Document):
|
||||
|
|
|
@ -101,7 +101,7 @@ class NamingSeries:
|
|||
# ignore B023: binding `count` is not necessary because
|
||||
# function is evaluated immediately and it can not be done
|
||||
# because of function signature requirement
|
||||
return str(count).zfill(digits) # noqa: B023
|
||||
return str(count).zfill(digits)
|
||||
|
||||
generated_names.append(parse_naming_series(self.series, doc=doc, number_generator=fake_counter))
|
||||
return generated_names
|
||||
|
|
|
@ -119,7 +119,7 @@ def update_document_title(
|
|||
def rename_doc(
|
||||
doctype: str | None = None,
|
||||
old: str | None = None,
|
||||
new: str = None,
|
||||
new: str | None = None,
|
||||
force: bool = False,
|
||||
merge: bool = False,
|
||||
ignore_permissions: bool = False,
|
||||
|
@ -390,7 +390,7 @@ def validate_rename(
|
|||
|
||||
def rename_doctype(doctype: str, old: str, new: str) -> None:
|
||||
# change options for fieldtype Table, Table MultiSelect and Link
|
||||
fields_with_options = ("Link",) + frappe.model.table_fields
|
||||
fields_with_options = ("Link", *frappe.model.table_fields)
|
||||
|
||||
for fieldtype in fields_with_options:
|
||||
update_options_for_fieldtype(fieldtype, old, new)
|
||||
|
|
|
@ -120,7 +120,7 @@ def apply_workflow(doc, action):
|
|||
doc.set(workflow.workflow_state_field, transition.next_state)
|
||||
|
||||
# find settings for the next state
|
||||
next_state = [d for d in workflow.states if d.state == transition.next_state][0]
|
||||
next_state = next(d for d in workflow.states if d.state == transition.next_state)
|
||||
|
||||
# update any additional field
|
||||
if next_state.update_field:
|
||||
|
|
|
@ -75,7 +75,7 @@ def import_file_by_path(
|
|||
force: bool = False,
|
||||
data_import: bool = False,
|
||||
pre_process=None,
|
||||
ignore_version: bool = None,
|
||||
ignore_version: bool | None = None,
|
||||
reset_permissions: bool = False,
|
||||
) -> bool:
|
||||
"""Import file from the given path.
|
||||
|
|
|
@ -187,8 +187,8 @@ def get_doc_path(module: str, doctype: str, name: str) -> str:
|
|||
|
||||
def reload_doc(
|
||||
module: str,
|
||||
dt: str = None,
|
||||
dn: str = None,
|
||||
dt: str | None = None,
|
||||
dn: str | None = None,
|
||||
force: bool = False,
|
||||
reset_permissions: bool = False,
|
||||
):
|
||||
|
|
|
@ -38,7 +38,7 @@ def execute():
|
|||
frappe.db.sql_ddl(f"ALTER TABLE `{table_name}` DROP INDEX `{index}`")
|
||||
except Exception as e:
|
||||
frappe.log_error("Failed to drop index")
|
||||
print(f"x Failed to drop index {index} from {table_name}\n {str(e)}")
|
||||
print(f"x Failed to drop index {index} from {table_name}\n {e!s}")
|
||||
else:
|
||||
print(f"✓ dropped {index} index from {table}")
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ def drop_index_if_exists(table: str, index: str):
|
|||
frappe.db.sql_ddl(f"ALTER TABLE `{table}` DROP INDEX `{index}`")
|
||||
except Exception as e:
|
||||
frappe.log_error("Failed to drop index")
|
||||
click.secho(f"x Failed to drop index {index} from {table}\n {str(e)}", fg="red")
|
||||
click.secho(f"x Failed to drop index {index} from {table}\n {e!s}", fg="red")
|
||||
return
|
||||
|
||||
click.echo(f"✓ dropped {index} index from {table}")
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import types
|
||||
import typing
|
||||
|
||||
from pypika import MySQLQuery, Order, PostgreSQLQuery, terms
|
||||
|
@ -62,8 +63,8 @@ class MariaDB(Base, MySQLQuery):
|
|||
|
||||
|
||||
class Postgres(Base, PostgreSQLQuery):
|
||||
field_translation = {"table_name": "relname", "table_rows": "n_tup_ins"}
|
||||
schema_translation = {"tables": "pg_stat_all_tables"}
|
||||
field_translation = types.MappingProxyType({"table_name": "relname", "table_rows": "n_tup_ins"})
|
||||
schema_translation = types.MappingProxyType({"tables": "pg_stat_all_tables"})
|
||||
# TODO: Find a better way to do this
|
||||
# These are interdependent query changes that need fixing. These
|
||||
# translations happen in the same query. But there is no check to see if
|
||||
|
|
|
@ -89,7 +89,7 @@ class RateLimiter:
|
|||
|
||||
|
||||
def rate_limit(
|
||||
key: str = None,
|
||||
key: str | None = None,
|
||||
limit: int | Callable = 5,
|
||||
seconds: int = 24 * 60 * 60,
|
||||
methods: str | list = "ALL",
|
||||
|
|
|
@ -21,13 +21,13 @@ def publish_progress(percent, title=None, doctype=None, docname=None, descriptio
|
|||
|
||||
|
||||
def publish_realtime(
|
||||
event: str = None,
|
||||
message: dict = None,
|
||||
room: str = None,
|
||||
user: str = None,
|
||||
doctype: str = None,
|
||||
docname: str = None,
|
||||
task_id: str = None,
|
||||
event: str | None = None,
|
||||
message: dict | None = None,
|
||||
room: str | None = None,
|
||||
user: str | None = None,
|
||||
doctype: str | None = None,
|
||||
docname: str | None = None,
|
||||
task_id: str | None = None,
|
||||
after_commit: bool = False,
|
||||
):
|
||||
"""Publish real-time updates
|
||||
|
|
|
@ -96,7 +96,7 @@ def slugs_with_web_view(_items_to_index):
|
|||
fields = ["route", doctype.website_search_field]
|
||||
filters = ({doctype.is_published_field: 1},)
|
||||
if doctype.website_search_field:
|
||||
docs = frappe.get_all(doctype.name, filters=filters, fields=fields + ["title"])
|
||||
docs = frappe.get_all(doctype.name, filters=filters, fields=[*fields, "title"])
|
||||
for doc in docs:
|
||||
content = frappe.utils.md_to_html(getattr(doc, doctype.website_search_field))
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import json
|
||||
import sys
|
||||
import typing
|
||||
from contextlib import contextmanager
|
||||
from functools import cached_property
|
||||
from random import choice
|
||||
|
@ -15,7 +16,7 @@ from werkzeug.test import TestResponse
|
|||
import frappe
|
||||
from frappe.installer import update_site_config
|
||||
from frappe.tests.utils import FrappeTestCase, patch_hooks
|
||||
from frappe.utils import cint, get_site_url, get_test_client, get_url
|
||||
from frappe.utils import cint, get_test_client, get_url
|
||||
|
||||
try:
|
||||
_site = frappe.local.site
|
||||
|
@ -40,7 +41,7 @@ def make_request(
|
|||
target: str,
|
||||
args: tuple | None = None,
|
||||
kwargs: dict | None = None,
|
||||
site: str = None,
|
||||
site: str | None = None,
|
||||
) -> TestResponse:
|
||||
t = ThreadWithReturnValue(target=target, args=args, kwargs=kwargs, site=site)
|
||||
t.start()
|
||||
|
@ -132,7 +133,7 @@ class FrappeAPITestCase(FrappeTestCase):
|
|||
|
||||
class TestResourceAPI(FrappeAPITestCase):
|
||||
DOCTYPE = "ToDo"
|
||||
GENERATED_DOCUMENTS = []
|
||||
GENERATED_DOCUMENTS: typing.ClassVar[list] = []
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import typing
|
||||
from random import choice
|
||||
|
||||
import requests
|
||||
|
@ -19,7 +20,7 @@ resource_key = {
|
|||
class TestResourceAPIV2(FrappeAPITestCase):
|
||||
version = "v2"
|
||||
DOCTYPE = "ToDo"
|
||||
GENERATED_DOCUMENTS = []
|
||||
GENERATED_DOCUMENTS: typing.ClassVar[list] = []
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
|
|
|
@ -10,6 +10,7 @@ import secrets
|
|||
import shlex
|
||||
import string
|
||||
import subprocess
|
||||
import types
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
|
@ -605,15 +606,17 @@ class TestCommands(BaseTestCommands):
|
|||
|
||||
|
||||
class TestBackups(BaseTestCommands):
|
||||
backup_map = {
|
||||
"includes": {
|
||||
"includes": [
|
||||
"ToDo",
|
||||
"Note",
|
||||
]
|
||||
},
|
||||
"excludes": {"excludes": ["Activity Log", "Access Log", "Error Log"]},
|
||||
}
|
||||
backup_map = types.MappingProxyType(
|
||||
{
|
||||
"includes": {
|
||||
"includes": [
|
||||
"ToDo",
|
||||
"Note",
|
||||
]
|
||||
},
|
||||
"excludes": {"excludes": ["Activity Log", "Access Log", "Error Log"]},
|
||||
}
|
||||
)
|
||||
home = os.path.expanduser("~")
|
||||
site_backup_path = frappe.utils.get_site_path("private", "backups")
|
||||
|
||||
|
|
|
@ -346,36 +346,40 @@ class TestDB(FrappeTestCase):
|
|||
random_value = random_string(20)
|
||||
|
||||
# Testing read
|
||||
self.assertEqual(list(frappe.get_all("ToDo", fields=[random_field], limit=1)[0])[0], random_field)
|
||||
self.assertEqual(next(iter(frappe.get_all("ToDo", fields=[random_field], limit=1)[0])), random_field)
|
||||
self.assertEqual(
|
||||
list(frappe.get_all("ToDo", fields=[f"`{random_field}` as total"], limit=1)[0])[0], "total"
|
||||
next(iter(frappe.get_all("ToDo", fields=[f"`{random_field}` as total"], limit=1)[0])), "total"
|
||||
)
|
||||
|
||||
# Testing read for distinct and sql functions
|
||||
self.assertEqual(
|
||||
list(
|
||||
frappe.get_all(
|
||||
"ToDo",
|
||||
fields=[f"`{random_field}` as total"],
|
||||
distinct=True,
|
||||
limit=1,
|
||||
)[0]
|
||||
)[0],
|
||||
next(
|
||||
iter(
|
||||
frappe.get_all(
|
||||
"ToDo",
|
||||
fields=[f"`{random_field}` as total"],
|
||||
distinct=True,
|
||||
limit=1,
|
||||
)[0]
|
||||
)
|
||||
),
|
||||
"total",
|
||||
)
|
||||
self.assertEqual(
|
||||
list(
|
||||
frappe.get_all(
|
||||
"ToDo",
|
||||
fields=[f"`{random_field}`"],
|
||||
distinct=True,
|
||||
limit=1,
|
||||
)[0]
|
||||
)[0],
|
||||
next(
|
||||
iter(
|
||||
frappe.get_all(
|
||||
"ToDo",
|
||||
fields=[f"`{random_field}`"],
|
||||
distinct=True,
|
||||
limit=1,
|
||||
)[0]
|
||||
)
|
||||
),
|
||||
random_field,
|
||||
)
|
||||
self.assertEqual(
|
||||
list(frappe.get_all("ToDo", fields=[f"count(`{random_field}`)"], limit=1)[0])[0],
|
||||
next(iter(frappe.get_all("ToDo", fields=[f"count(`{random_field}`)"], limit=1)[0])),
|
||||
"count" if frappe.conf.db_type == "postgres" else f"count(`{random_field}`)",
|
||||
)
|
||||
|
||||
|
|
|
@ -13,13 +13,13 @@ test_dependencies = ["Blog Category", "Blogger"]
|
|||
class TestFormLoad(FrappeTestCase):
|
||||
def test_load(self):
|
||||
getdoctype("DocType")
|
||||
meta = list(filter(lambda d: d.name == "DocType", frappe.response.docs))[0]
|
||||
meta = next(filter(lambda d: d.name == "DocType", frappe.response.docs))
|
||||
self.assertEqual(meta.name, "DocType")
|
||||
self.assertTrue(meta.get("__js"))
|
||||
|
||||
frappe.response.docs = []
|
||||
getdoctype("Event")
|
||||
meta = list(filter(lambda d: d.name == "Event", frappe.response.docs))[0]
|
||||
meta = next(filter(lambda d: d.name == "Event", frappe.response.docs))
|
||||
self.assertTrue(meta.get("__calendar_js"))
|
||||
|
||||
def test_fieldlevel_permissions_in_load(self):
|
||||
|
|
|
@ -319,7 +319,7 @@ class TestNaming(FrappeTestCase):
|
|||
def test_naming_series_validation(self):
|
||||
dns = frappe.get_doc("Document Naming Settings")
|
||||
existing_series = dns.get_transactions_and_prefixes()["prefixes"]
|
||||
valid = ["SINV-", "SI-.{field}.", "SI-#.###", ""] + existing_series
|
||||
valid = ["SINV-", "SI-.{field}.", "SI-#.###", "", *existing_series]
|
||||
invalid = ["$INV-", r"WINDOWS\NAMING"]
|
||||
|
||||
for series in valid:
|
||||
|
|
|
@ -173,4 +173,4 @@ def _get_dotted_path(file: Path, app) -> str:
|
|||
*path, filename = file.relative_to(app_path).parts
|
||||
base_filename = Path(filename).stem
|
||||
|
||||
return ".".join([app] + path + [base_filename])
|
||||
return ".".join([app, *path, base_filename])
|
||||
|
|
|
@ -18,7 +18,7 @@ from frappe.utils import add_to_date, now
|
|||
|
||||
|
||||
@contextmanager
|
||||
def patch_db(endpoints: list[str] = None):
|
||||
def patch_db(endpoints: list[str] | None = None):
|
||||
patched_endpoints = []
|
||||
|
||||
for point in endpoints:
|
||||
|
|
|
@ -19,7 +19,6 @@ from frappe.translate import (
|
|||
get_language,
|
||||
get_parent_language,
|
||||
get_translation_dict_from_file,
|
||||
write_translations_file,
|
||||
)
|
||||
from frappe.utils import get_bench_path, set_request
|
||||
|
||||
|
@ -36,10 +35,10 @@ _lazy_translations = _lt("Communication")
|
|||
|
||||
|
||||
class TestTranslate(FrappeTestCase):
|
||||
guest_sessions_required = [
|
||||
guest_sessions_required = (
|
||||
"test_guest_request_language_resolution_with_cookie",
|
||||
"test_guest_request_language_resolution_with_request_header",
|
||||
]
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
if self._testMethodName in self.guest_sessions_required:
|
||||
|
|
|
@ -488,7 +488,7 @@ class TestPythonExpressions(FrappeTestCase):
|
|||
try:
|
||||
validate_python_code(expr)
|
||||
except Exception as e:
|
||||
self.fail(f"Invalid error thrown for valid expression: {expr}: {str(e)}")
|
||||
self.fail(f"Invalid error thrown for valid expression: {expr}: {e!s}")
|
||||
|
||||
def test_validation_for_bad_python_expression(self):
|
||||
invalid_expressions = [
|
||||
|
@ -1075,7 +1075,7 @@ class TestTBSanitization(FrappeTestCase):
|
|||
def test_traceback_sanitzation(self):
|
||||
try:
|
||||
password = "42" # noqa: F841
|
||||
args = {"password": "42", "pwd": "42", "safe": "safe_value"} # noqa: F841
|
||||
args = {"password": "42", "pwd": "42", "safe": "safe_value"}
|
||||
args = frappe._dict({"password": "42", "pwd": "42", "safe": "safe_value"}) # noqa: F841
|
||||
raise Exception
|
||||
except Exception:
|
||||
|
|
|
@ -31,7 +31,7 @@ MERGED_TRANSLATION_KEY = "merged_translations"
|
|||
USER_TRANSLATION_KEY = "lang_user_translations"
|
||||
|
||||
|
||||
def get_language(lang_list: list = None) -> str:
|
||||
def get_language(lang_list: list | None = None) -> str:
|
||||
"""Set `frappe.local.lang` from HTTP headers at beginning of request
|
||||
|
||||
Order of priority for setting language:
|
||||
|
@ -94,7 +94,7 @@ def get_parent_language(lang: str) -> str:
|
|||
return lang.split(sep)[0]
|
||||
|
||||
|
||||
def get_user_lang(user: str = None) -> str:
|
||||
def get_user_lang(user: str | None = None) -> str:
|
||||
"""Set frappe.local.lang from user preferences on session beginning or resumption"""
|
||||
user = user or frappe.session.user
|
||||
lang = frappe.cache.hget("lang", user)
|
||||
|
|
|
@ -354,7 +354,7 @@ def log(event, details):
|
|||
|
||||
def dict_to_str(args: dict[str, Any], sep: str = "&") -> str:
|
||||
"""Convert a dictionary to URL."""
|
||||
return sep.join(f"{str(k)}=" + quote(str(args[k] or "")) for k in list(args))
|
||||
return sep.join(f"{k!s}=" + quote(str(args[k] or "")) for k in list(args))
|
||||
|
||||
|
||||
def list_to_str(seq, sep=", "):
|
||||
|
@ -623,7 +623,7 @@ def update_progress_bar(txt, i, l, absolute=False):
|
|||
|
||||
complete = int(float(i + 1) / l * col)
|
||||
completion_bar = ("=" * complete).ljust(col, " ")
|
||||
percent_complete = f"{str(int(float(i + 1) / l * 100))}%"
|
||||
percent_complete = f"{int(float(i + 1) / l * 100)!s}%"
|
||||
status = f"{i} of {l}" if absolute else percent_complete
|
||||
sys.stdout.write(f"\r{txt}: [{completion_bar}] {status}")
|
||||
sys.stdout.flush()
|
||||
|
|
|
@ -64,10 +64,10 @@ def enqueue(
|
|||
now: bool = False,
|
||||
enqueue_after_commit: bool = False,
|
||||
*,
|
||||
on_success: Callable = None,
|
||||
on_failure: Callable = None,
|
||||
on_success: Callable | None = None,
|
||||
on_failure: Callable | None = None,
|
||||
at_front: bool = False,
|
||||
job_id: str = None,
|
||||
job_id: str | None = None,
|
||||
deduplicate=False,
|
||||
**kwargs,
|
||||
) -> Job | Any:
|
||||
|
@ -264,7 +264,7 @@ def start_worker(
|
|||
rq_password: str | None = None,
|
||||
burst: bool = False,
|
||||
strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT,
|
||||
) -> NoReturn | None: # pragma: no cover
|
||||
) -> None: # pragma: no cover
|
||||
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
|
||||
|
||||
if not strategy:
|
||||
|
@ -470,7 +470,7 @@ def get_redis_conn(username=None, password=None):
|
|||
raise
|
||||
except Exception as e:
|
||||
log(
|
||||
f"Please make sure that Redis Queue runs @ {frappe.get_conf().redis_queue}. Redis reported error: {str(e)}",
|
||||
f"Please make sure that Redis Queue runs @ {frappe.get_conf().redis_queue}. Redis reported error: {e!s}",
|
||||
colour="red",
|
||||
)
|
||||
raise
|
||||
|
|
|
@ -442,7 +442,7 @@ class BackupGenerator:
|
|||
cmd.append(bin)
|
||||
cmd.append(shlex.join(args))
|
||||
|
||||
command = " ".join(["set -o pipefail;"] + cmd + ["|", gzip_exc, ">>", self.backup_path_db])
|
||||
command = " ".join(["set -o pipefail;", *cmd, "|", gzip_exc, ">>", self.backup_path_db])
|
||||
if self.verbose:
|
||||
print(command.replace(shlex.quote(self.password), "*" * 10) + "\n")
|
||||
|
||||
|
|
|
@ -290,7 +290,7 @@ class PatchCreator:
|
|||
raise Exception(f"Patch {self.patch_file} already exists")
|
||||
|
||||
*path, _filename = self.patch_file.relative_to(self.app_dir.parents[0]).parts
|
||||
dotted_path = ".".join(path + [self.patch_file.stem])
|
||||
dotted_path = ".".join([*path, self.patch_file.stem])
|
||||
|
||||
patches_txt = self.app_dir / "patches.txt"
|
||||
existing_patches = patches_txt.read_text()
|
||||
|
|
|
@ -87,7 +87,7 @@ def site_cache(ttl: int | None = None, maxsize: int | None = None) -> Callable:
|
|||
calculate_pi(10) # will calculate value
|
||||
"""
|
||||
|
||||
def time_cache_wrapper(func: Callable = None) -> Callable:
|
||||
def time_cache_wrapper(func: Callable | None = None) -> Callable:
|
||||
func_key = f"{func.__module__}.{func.__name__}"
|
||||
|
||||
def clear_cache():
|
||||
|
@ -140,7 +140,7 @@ def redis_cache(ttl: int | None = 3600, user: str | bool | None = None) -> Calla
|
|||
user: `true` should cache be specific to session user.
|
||||
"""
|
||||
|
||||
def wrapper(func: Callable = None) -> Callable:
|
||||
def wrapper(func: Callable | None = None) -> Callable:
|
||||
func_key = f"{func.__module__}.{func.__qualname__}"
|
||||
|
||||
def clear_cache():
|
||||
|
|
|
@ -2031,7 +2031,8 @@ def get_filter(doctype: str, f: dict | list | tuple, filters_config=None) -> "fr
|
|||
"timespan",
|
||||
"previous",
|
||||
"next",
|
||||
) + NestedSetHierarchy
|
||||
*NestedSetHierarchy,
|
||||
)
|
||||
|
||||
if filters_config:
|
||||
additional_operators = [key.lower() for key in filters_config]
|
||||
|
@ -2444,7 +2445,7 @@ def parse_timedelta(s: str) -> datetime.timedelta:
|
|||
return datetime.timedelta(**{key: float(val) for key, val in m.groupdict().items()})
|
||||
|
||||
|
||||
def get_job_name(key: str, doctype: str = None, doc_name: str = None) -> str:
|
||||
def get_job_name(key: str, doctype: str | None = None, doc_name: str | None = None) -> str:
|
||||
job_name = key
|
||||
if doctype:
|
||||
job_name += f"_{doctype}"
|
||||
|
|
|
@ -54,7 +54,7 @@ def parse_date(date):
|
|||
date = date.split(" ", 1)[0]
|
||||
|
||||
# why the sorting? checking should be done in a predictable order
|
||||
check_formats = [None] + sorted(list(dateformats), reverse=not get_user_date_format().startswith("dd"))
|
||||
check_formats = [None, *sorted(list(dateformats), reverse=not get_user_date_format().startswith("dd"))]
|
||||
|
||||
for f in check_formats:
|
||||
try:
|
||||
|
|
|
@ -108,7 +108,7 @@ def format_value(value, df=None, doc=None, currency=None, translated=False, form
|
|||
elif df.get("fieldtype") == "Table MultiSelect":
|
||||
values = []
|
||||
meta = frappe.get_meta(df.options)
|
||||
link_field = [df for df in meta.fields if df.fieldtype == "Link"][0]
|
||||
link_field = next(df for df in meta.fields if df.fieldtype == "Link")
|
||||
for v in value:
|
||||
v.update({"__link_titles": doc.get("__link_titles")})
|
||||
formatted_value = frappe.format_value(v.get(link_field.fieldname, ""), link_field, v)
|
||||
|
|
|
@ -397,7 +397,7 @@ def sync_values(values: list):
|
|||
GlobalSearch = frappe.qb.Table("__global_search")
|
||||
conflict_fields = ["content", "published", "title", "route"]
|
||||
|
||||
query = frappe.qb.into(GlobalSearch).columns(["doctype", "name"] + conflict_fields).insert(*values)
|
||||
query = frappe.qb.into(GlobalSearch).columns(["doctype", "name", *conflict_fields]).insert(*values)
|
||||
|
||||
if frappe.db.db_type == "postgres":
|
||||
query = query.on_conflict(GlobalSearch.doctype, GlobalSearch.name)
|
||||
|
|
|
@ -33,7 +33,7 @@ def add_random_children(doc: "Document", fieldname: str, rows, randomize: dict,
|
|||
doc.append(fieldname, d)
|
||||
|
||||
|
||||
def get_random(doctype: str, filters: dict = None, doc: bool = False):
|
||||
def get_random(doctype: str, filters: dict | None = None, doc: bool = False):
|
||||
condition = []
|
||||
if filters:
|
||||
condition.extend("{}='{}'".format(key, str(val).replace("'", "'")) for key, val in filters.items())
|
||||
|
|
|
@ -151,7 +151,7 @@ def get_info_via_oauth(provider: str, code: str, decoder: Callable | None = None
|
|||
|
||||
if provider == "github" and not info.get("email"):
|
||||
emails = session.get("/user/emails", params=api_endpoint_args).json()
|
||||
email_dict = list(filter(lambda x: x.get("primary"), emails))[0]
|
||||
email_dict = next(filter(lambda x: x.get("primary"), emails))
|
||||
info["email"] = email_dict.get("email")
|
||||
|
||||
if not (info.get("email_verified") or info.get("email")):
|
||||
|
|
|
@ -17,7 +17,7 @@ if TYPE_CHECKING:
|
|||
from zxcvbn.matching import _Match
|
||||
|
||||
|
||||
def test_password_strength(password: str, user_inputs: "Iterable[object]" = None) -> "_Result":
|
||||
def test_password_strength(password: str, user_inputs: "Iterable[object] | None" = None) -> "_Result":
|
||||
"""Wrapper around zxcvbn.password_strength"""
|
||||
if len(password) > 128:
|
||||
# zxcvbn takes forever when checking long, random passwords.
|
||||
|
|
|
@ -232,7 +232,7 @@ def json_handler(obj):
|
|||
return repr(obj)
|
||||
|
||||
else:
|
||||
raise TypeError(f"""Object of type {type(obj)} with value of {repr(obj)} is not JSON serializable""")
|
||||
raise TypeError(f"""Object of type {type(obj)} with value of {obj!r} is not JSON serializable""")
|
||||
|
||||
|
||||
def as_page():
|
||||
|
|
|
@ -395,7 +395,7 @@ def get_python_builtins():
|
|||
}
|
||||
|
||||
|
||||
def get_hooks(hook: str = None, default=None, app_name: str = None) -> frappe._dict:
|
||||
def get_hooks(hook: str | None = None, default=None, app_name: str | None = None) -> frappe._dict:
|
||||
"""Get hooks via `app/hooks.py`
|
||||
|
||||
:param hook: Name of the hook. Will gather all hooks for this name and return as a list.
|
||||
|
|
|
@ -52,7 +52,9 @@ def qualified_name(obj) -> str:
|
|||
return f"{module}.{qualname}"
|
||||
|
||||
|
||||
def raise_type_error(arg_name: str, arg_type: type, arg_value: object, current_exception: Exception = None):
|
||||
def raise_type_error(
|
||||
arg_name: str, arg_type: type, arg_value: object, current_exception: Exception | None = None
|
||||
):
|
||||
"""
|
||||
Raise a TypeError with a message that includes the name of the argument, the expected type
|
||||
and the actual type of the value passed.
|
||||
|
|
|
@ -330,7 +330,7 @@ def add_system_manager(
|
|||
first_name: str | None = None,
|
||||
last_name: str | None = None,
|
||||
send_welcome_email: bool = False,
|
||||
password: str = None,
|
||||
password: str | None = None,
|
||||
) -> "User":
|
||||
# add user
|
||||
user = frappe.new_doc("User")
|
||||
|
|
|
@ -62,7 +62,7 @@ class TestBlogPost(FrappeTestCase):
|
|||
|
||||
# On blog post page find link to the category page
|
||||
soup = BeautifulSoup(blog_page_html, "html.parser")
|
||||
category_page_link = list(soup.find_all("a", href=re.compile(blog.blog_category)))[0]
|
||||
category_page_link = next(iter(soup.find_all("a", href=re.compile(blog.blog_category))))
|
||||
category_page_url = category_page_link["href"]
|
||||
|
||||
cached_value = frappe.db.value_cache.get(("DocType", "Blog Post", "name"))
|
||||
|
|
|
@ -49,7 +49,8 @@ class PathResolver:
|
|||
return endpoint, TemplatePage(endpoint, self.http_status_code)
|
||||
|
||||
custom_renderers = self.get_custom_page_renderers()
|
||||
renderers = custom_renderers + [
|
||||
renderers = [
|
||||
*custom_renderers,
|
||||
StaticPage,
|
||||
WebFormPage,
|
||||
DocumentPage,
|
||||
|
|
|
@ -115,7 +115,7 @@ def get_rendered_template(
|
|||
no_letterhead: bool | None = None,
|
||||
letterhead: str | None = None,
|
||||
trigger_print: bool = False,
|
||||
settings: dict = None,
|
||||
settings: dict | None = None,
|
||||
) -> str:
|
||||
print_settings = frappe.get_single("Print Settings").as_dict()
|
||||
print_settings.update(settings or {})
|
||||
|
|
|
@ -20,7 +20,7 @@ def get_context(context):
|
|||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
def get_search_results(text: str, scope: str = None, start: int = 0, as_html: bool = False):
|
||||
def get_search_results(text: str, scope: str | None = None, start: int = 0, as_html: bool = False):
|
||||
results = web_search(text, scope, start, limit=21)
|
||||
out = frappe._dict()
|
||||
|
||||
|
|
|
@ -110,6 +110,7 @@ select = [
|
|||
"I",
|
||||
"UP",
|
||||
"B",
|
||||
"RUF",
|
||||
]
|
||||
ignore = [
|
||||
"B017", # assertRaises(Exception) - should be more specific
|
||||
|
@ -126,6 +127,7 @@ ignore = [
|
|||
"F722", # syntax error in forward type annotation
|
||||
"F821", # undefined name
|
||||
"W191", # indentation contains tabs
|
||||
"RUF001", # string contains ambiguous unicode character
|
||||
]
|
||||
|
||||
[tool.ruff.format]
|
||||
|
|
Loading…
Reference in New Issue
Block a user