2
0
mirror of https://github.com/frappe/frappe.git synced 2024-06-13 00:42:21 +00:00
frappe/frappe/auth.py
Akhil Narang 68fd322955
fix: bypass IP restriction for the methods required for our socketio backend
Those requests are made from a separate backend, not by the user.

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
2024-05-31 16:31:02 +05:30

699 lines
21 KiB
Python

# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See LICENSE
import base64
import binascii
from urllib.parse import quote, urlencode, urlparse
from werkzeug.wrappers import Response
import frappe
import frappe.database
import frappe.utils
import frappe.utils.user
from frappe import _
from frappe.core.doctype.activity_log.activity_log import add_authentication_log
from frappe.sessions import Session, clear_sessions, delete_session, get_expiry_in_seconds
from frappe.translate import get_language
from frappe.twofactor import (
authenticate_for_2factor,
confirm_otp_token,
get_cached_user_pass,
should_run_2fa,
)
from frappe.utils import cint, date_diff, datetime, get_datetime, today
from frappe.utils.deprecations import deprecation_warning
from frappe.utils.password import check_password, get_decrypted_password
from frappe.website.utils import get_home_page
SAFE_HTTP_METHODS = frozenset(("GET", "HEAD", "OPTIONS"))
UNSAFE_HTTP_METHODS = frozenset(("POST", "PUT", "DELETE", "PATCH"))
MAX_PASSWORD_SIZE = 512
class HTTPRequest:
def __init__(self):
# set frappe.local.request_ip
self.set_request_ip()
# load cookies
self.set_cookies()
# login and start/resume user session
self.set_session()
# set request language
self.set_lang()
# match csrf token from current session
self.validate_csrf_token()
# write out latest cookies
frappe.local.cookie_manager.init_cookies()
@property
def domain(self):
if not getattr(self, "_domain", None):
self._domain = frappe.request.host
if self._domain and self._domain.startswith("www."):
self._domain = self._domain[4:]
return self._domain
def set_request_ip(self):
if frappe.get_request_header("X-Forwarded-For"):
frappe.local.request_ip = (frappe.get_request_header("X-Forwarded-For").split(",", 1)[0]).strip()
elif frappe.get_request_header("REMOTE_ADDR"):
frappe.local.request_ip = frappe.get_request_header("REMOTE_ADDR")
else:
frappe.local.request_ip = "127.0.0.1"
def set_cookies(self):
frappe.local.cookie_manager = CookieManager()
def set_session(self):
frappe.local.login_manager = LoginManager()
def validate_csrf_token(self):
if (
not frappe.request
or frappe.request.method not in UNSAFE_HTTP_METHODS
or frappe.conf.ignore_csrf
or not frappe.session
or not (saved_token := frappe.session.data.csrf_token)
or (
(frappe.get_request_header("X-Frappe-CSRF-Token") or frappe.form_dict.pop("csrf_token", None))
== saved_token
)
):
return
frappe.flags.disable_traceback = True
frappe.throw(_("Invalid Request"), frappe.CSRFTokenError)
def set_lang(self):
frappe.local.lang = get_language()
class LoginManager:
__slots__ = ("user", "info", "full_name", "user_type", "resume")
def __init__(self):
self.user = None
self.info = None
self.full_name = None
self.user_type = None
if frappe.local.form_dict.get("cmd") == "login" or frappe.local.request.path == "/api/method/login":
if self.login() is False:
return
self.resume = False
# run login triggers
self.run_trigger("on_session_creation")
else:
try:
self.resume = True
self.make_session(resume=True)
self.get_user_info()
self.set_user_info(resume=True)
except AttributeError:
self.user = "Guest"
self.get_user_info()
self.make_session()
self.set_user_info()
def login(self):
self.run_trigger("before_login")
if frappe.get_system_settings("disable_user_pass_login"):
frappe.throw(_("Login with username and password is not allowed."), frappe.AuthenticationError)
# clear cache
frappe.clear_cache(user=frappe.form_dict.get("usr"))
user, pwd = get_cached_user_pass()
self.authenticate(user=user, pwd=pwd)
if self.force_user_to_reset_password():
doc = frappe.get_doc("User", self.user)
frappe.local.response["redirect_to"] = doc.reset_password(send_email=False, password_expired=True)
frappe.local.response["message"] = "Password Reset"
return False
if should_run_2fa(self.user):
authenticate_for_2factor(self.user)
if not confirm_otp_token(self):
return False
frappe.form_dict.pop("pwd", None)
self.post_login()
def post_login(self):
self.run_trigger("on_login")
validate_ip_address(self.user)
self.validate_hour()
self.get_user_info()
self.make_session()
self.setup_boot_cache()
self.set_user_info()
def get_user_info(self):
self.info = frappe.get_cached_value(
"User", self.user, ["user_type", "first_name", "last_name", "user_image"], as_dict=1
)
self.user_type = self.info.user_type
def setup_boot_cache(self):
frappe.cache_manager.build_table_count_cache()
frappe.cache_manager.build_domain_restriced_doctype_cache()
frappe.cache_manager.build_domain_restriced_page_cache()
def set_user_info(self, resume=False):
# set sid again
frappe.local.cookie_manager.init_cookies()
self.full_name = " ".join(filter(None, [self.info.first_name, self.info.last_name]))
if self.info.user_type == "Website User":
frappe.local.cookie_manager.set_cookie("system_user", "no")
if not resume:
frappe.local.response["message"] = "No App"
frappe.local.response["home_page"] = "/" + get_home_page()
else:
frappe.local.cookie_manager.set_cookie("system_user", "yes")
if not resume:
frappe.local.response["message"] = "Logged In"
frappe.local.response["home_page"] = "/app"
if not resume:
frappe.response["full_name"] = self.full_name
# redirect information
redirect_to = frappe.cache.hget("redirect_after_login", self.user)
if redirect_to:
frappe.local.response["redirect_to"] = redirect_to
frappe.cache.hdel("redirect_after_login", self.user)
frappe.local.cookie_manager.set_cookie("full_name", self.full_name)
frappe.local.cookie_manager.set_cookie("user_id", self.user)
frappe.local.cookie_manager.set_cookie("user_image", self.info.user_image or "")
def clear_preferred_language(self):
frappe.local.cookie_manager.delete_cookie("preferred_language")
def make_session(self, resume=False):
# start session
frappe.local.session_obj = Session(
user=self.user, resume=resume, full_name=self.full_name, user_type=self.user_type
)
# reset user if changed to Guest
self.user = frappe.local.session_obj.user
frappe.local.session = frappe.local.session_obj.data
self.clear_active_sessions()
def clear_active_sessions(self):
"""Clear other sessions of the current user if `deny_multiple_sessions` is not set"""
if frappe.session.user == "Guest":
return
if not (
cint(frappe.conf.get("deny_multiple_sessions"))
or cint(frappe.db.get_system_setting("deny_multiple_sessions"))
):
return
clear_sessions(frappe.session.user, keep_current=True)
def authenticate(self, user: str | None = None, pwd: str | None = None):
from frappe.core.doctype.user.user import User
if not (user and pwd):
user, pwd = frappe.form_dict.get("usr"), frappe.form_dict.get("pwd")
if not (user and pwd):
self.fail(_("Incomplete login details"), user=user)
if len(pwd) > MAX_PASSWORD_SIZE:
self.fail(_("Password size exceeded the maximum allowed size"), user=user)
_raw_user_name = user
user = User.find_by_credentials(user, pwd)
ip_tracker = get_login_attempt_tracker(frappe.local.request_ip)
if not user:
ip_tracker and ip_tracker.add_failure_attempt()
self.fail("Invalid login credentials", user=_raw_user_name)
# Current login flow uses cached credentials for authentication while checking OTP.
# Incase of OTP check, tracker for auth needs to be disabled(If not, it can remove tracker history as it is going to succeed anyway)
# Tracker is activated for 2FA incase of OTP.
ignore_tracker = should_run_2fa(user.name) and ("otp" in frappe.form_dict)
user_tracker = None if ignore_tracker else get_login_attempt_tracker(user.name)
if not user.is_authenticated:
user_tracker and user_tracker.add_failure_attempt()
ip_tracker and ip_tracker.add_failure_attempt()
self.fail("Invalid login credentials", user=user.name)
elif not (user.name == "Administrator" or user.enabled):
user_tracker and user_tracker.add_failure_attempt()
ip_tracker and ip_tracker.add_failure_attempt()
self.fail("User disabled or missing", user=user.name)
else:
user_tracker and user_tracker.add_success_attempt()
ip_tracker and ip_tracker.add_success_attempt()
self.user = user.name
def force_user_to_reset_password(self):
if not self.user:
return
if self.user in frappe.STANDARD_USERS:
return False
reset_pwd_after_days = cint(frappe.get_system_settings("force_user_to_reset_password"))
if reset_pwd_after_days:
last_password_reset_date = (
frappe.db.get_value("User", self.user, "last_password_reset_date") or today()
)
last_pwd_reset_days = date_diff(today(), last_password_reset_date)
if last_pwd_reset_days > reset_pwd_after_days:
return True
def check_password(self, user, pwd):
"""check password"""
try:
# return user in correct case
return check_password(user, pwd)
except frappe.AuthenticationError:
self.fail("Incorrect password", user=user)
def fail(self, message, user=None):
if not user:
user = _("Unknown User")
frappe.local.response["message"] = message
add_authentication_log(message, user, status="Failed")
frappe.db.commit()
raise frappe.AuthenticationError
def run_trigger(self, event="on_login"):
for method in frappe.get_hooks().get(event, []):
frappe.call(frappe.get_attr(method), login_manager=self)
def validate_hour(self):
"""check if user is logging in during restricted hours"""
login_before = cint(frappe.db.get_value("User", self.user, "login_before", ignore=True))
login_after = cint(frappe.db.get_value("User", self.user, "login_after", ignore=True))
if not (login_before or login_after):
return
from frappe.utils import now_datetime
current_hour = int(now_datetime().strftime("%H"))
if login_before and current_hour >= login_before:
frappe.throw(_("Login not allowed at this time"), frappe.AuthenticationError)
if login_after and current_hour < login_after:
frappe.throw(_("Login not allowed at this time"), frappe.AuthenticationError)
def login_as_guest(self):
"""login as guest"""
self.login_as("Guest")
def login_as(self, user):
self.user = user
self.post_login()
def impersonate(self, user):
current_user = frappe.session.user
self.login_as(user)
# Flag this session as impersonated session, so other code can log this.
frappe.local.session_obj.set_impersonsated(current_user)
def logout(self, arg="", user=None):
if not user:
user = frappe.session.user
self.run_trigger("on_logout")
if user == frappe.session.user:
delete_session(frappe.session.sid, user=user, reason="User Manually Logged Out")
self.clear_cookies()
if frappe.request:
self.login_as_guest()
else:
clear_sessions(user)
def clear_cookies(self):
clear_cookies()
class CookieManager:
def __init__(self):
self.cookies = {}
self.to_delete = []
def init_cookies(self):
if not frappe.local.session.get("sid"):
return
if frappe.session.sid:
self.set_cookie("sid", frappe.session.sid, max_age=get_expiry_in_seconds(), httponly=True)
def set_cookie(
self,
key,
value,
expires=None,
secure=False,
httponly=False,
samesite="Lax",
max_age=None,
):
if not secure and hasattr(frappe.local, "request"):
secure = frappe.local.request.scheme == "https"
self.cookies[key] = {
"value": value,
"expires": expires,
"secure": secure,
"httponly": httponly,
"samesite": samesite,
"max_age": max_age,
}
def delete_cookie(self, to_delete):
if not isinstance(to_delete, list | tuple):
to_delete = [to_delete]
self.to_delete.extend(to_delete)
def flush_cookies(self, response: Response):
for key, opts in self.cookies.items():
response.set_cookie(
key,
quote((opts.get("value") or "").encode("utf-8")),
expires=opts.get("expires"),
secure=opts.get("secure"),
httponly=opts.get("httponly"),
samesite=opts.get("samesite"),
max_age=opts.get("max_age"),
)
# expires yesterday!
expires = datetime.datetime.now() + datetime.timedelta(days=-1)
for key in set(self.to_delete):
response.set_cookie(key, "", expires=expires)
@frappe.whitelist()
def get_logged_user():
return frappe.session.user
def clear_cookies():
if hasattr(frappe.local, "session"):
frappe.session.sid = ""
frappe.local.cookie_manager.delete_cookie(["full_name", "user_id", "sid", "user_image", "system_user"])
def validate_ip_address(user):
"""
Method to check if the user has IP restrictions enabled, and if so is the IP address they are
connecting from allowlisted.
Certain methods called from our socketio backend need direct access, and so the IP is not
checked for those
"""
if hasattr(frappe.local, "request") and frappe.local.request.path.startswith(
"/api/method/frappe.realtime."
):
return True
from frappe.core.doctype.user.user import get_restricted_ip_list
# Only fetch required fields - for perf
user_fields = ["restrict_ip", "bypass_restrict_ip_check_if_2fa_enabled"]
user_info = (
frappe.get_cached_value("User", user, user_fields, as_dict=True)
if not frappe.flags.in_test
else frappe.db.get_value("User", user, user_fields, as_dict=True)
)
ip_list = get_restricted_ip_list(user_info)
if not ip_list:
return
system_settings = (
frappe.get_cached_doc("System Settings")
if not frappe.flags.in_test
else frappe.get_single("System Settings")
)
# check if bypass restrict ip is enabled for all users
bypass_restrict_ip_check = system_settings.bypass_restrict_ip_check_if_2fa_enabled
# check if two factor auth is enabled
if system_settings.enable_two_factor_auth and not bypass_restrict_ip_check:
# check if bypass restrict ip is enabled for login user
bypass_restrict_ip_check = user_info.bypass_restrict_ip_check_if_2fa_enabled
for ip in ip_list:
if frappe.local.request_ip.startswith(ip) or bypass_restrict_ip_check:
return
frappe.throw(_("Access not allowed from this IP Address"), frappe.AuthenticationError)
def get_login_attempt_tracker(key: str, raise_locked_exception: bool = True):
"""Get login attempt tracker instance.
:param user_name: Name of the loggedin user
:param raise_locked_exception: If set, raises an exception incase of user not allowed to login
"""
sys_settings = frappe.get_doc("System Settings")
track_login_attempts = sys_settings.allow_consecutive_login_attempts > 0
tracker_kwargs = {}
if track_login_attempts:
tracker_kwargs["lock_interval"] = sys_settings.allow_login_after_fail
tracker_kwargs["max_consecutive_login_attempts"] = sys_settings.allow_consecutive_login_attempts
tracker = LoginAttemptTracker(key, **tracker_kwargs)
if raise_locked_exception and track_login_attempts and not tracker.is_user_allowed():
frappe.throw(
_("Your account has been locked and will resume after {0} seconds").format(
sys_settings.allow_login_after_fail
),
frappe.SecurityException,
)
return tracker
class LoginAttemptTracker:
"""Track login attemts of a user.
Lock the account for s number of seconds if there have been n consecutive unsuccessful attempts to log in.
"""
def __init__(
self,
key: str,
max_consecutive_login_attempts: int = 3,
lock_interval: int = 5 * 60,
*,
user_name: str | None = None,
):
"""Initialize the tracker.
:param user_name: Name of the loggedin user
:param max_consecutive_login_attempts: Maximum allowed consecutive failed login attempts
:param lock_interval: Locking interval incase of maximum failed attempts
"""
if user_name:
deprecation_warning("`username` parameter is deprecated, use `key` instead.")
self.key = key or user_name
self.lock_interval = datetime.timedelta(seconds=lock_interval)
self.max_failed_logins = max_consecutive_login_attempts
@property
def login_failed_count(self):
return frappe.cache.hget("login_failed_count", self.key)
@login_failed_count.setter
def login_failed_count(self, count):
frappe.cache.hset("login_failed_count", self.key, count)
@login_failed_count.deleter
def login_failed_count(self):
frappe.cache.hdel("login_failed_count", self.key)
@property
def login_failed_time(self):
"""First failed login attempt time within lock interval.
For every user we track only First failed login attempt time within lock interval of time.
"""
return frappe.cache.hget("login_failed_time", self.key)
@login_failed_time.setter
def login_failed_time(self, timestamp):
frappe.cache.hset("login_failed_time", self.key, timestamp)
@login_failed_time.deleter
def login_failed_time(self):
frappe.cache.hdel("login_failed_time", self.key)
def add_failure_attempt(self):
"""Log user failure attempts into the system.
Increase the failure count if new failure is with in current lock interval time period, if not reset the login failure count.
"""
login_failed_time = self.login_failed_time
login_failed_count = self.login_failed_count # Consecutive login failure count
current_time = get_datetime()
if not (login_failed_time and login_failed_count):
login_failed_time, login_failed_count = current_time, 0
if login_failed_time + self.lock_interval > current_time:
login_failed_count += 1
else:
login_failed_time, login_failed_count = current_time, 1
self.login_failed_time = login_failed_time
self.login_failed_count = login_failed_count
def add_success_attempt(self):
"""Reset login failures."""
del self.login_failed_count
del self.login_failed_time
def is_user_allowed(self) -> bool:
"""Is user allowed to login
User is not allowed to login if login failures are greater than threshold within in lock interval from first login failure.
"""
login_failed_time = self.login_failed_time
login_failed_count = self.login_failed_count or 0
current_time = get_datetime()
if (
login_failed_time
and login_failed_time + self.lock_interval > current_time
and login_failed_count > self.max_failed_logins
):
return False
return True
def validate_auth():
"""
Authenticate and sets user for the request.
"""
authorization_header = frappe.get_request_header("Authorization", "").split(" ")
if len(authorization_header) == 2:
validate_oauth(authorization_header)
validate_auth_via_api_keys(authorization_header)
validate_auth_via_hooks()
# If login via bearer, basic or keypair didn't work then authentication failed and we
# should terminate here.
if len(authorization_header) == 2 and frappe.session.user in ("", "Guest"):
raise frappe.AuthenticationError
def validate_oauth(authorization_header):
"""
Authenticate request using OAuth and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
from frappe.integrations.oauth2 import get_oauth_server
from frappe.oauth import get_url_delimiter
if authorization_header[0].lower() != "bearer":
return
form_dict = frappe.local.form_dict
token = authorization_header[1]
req = frappe.request
parsed_url = urlparse(req.url)
access_token = {"access_token": token}
uri = parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path + "?" + urlencode(access_token)
http_method = req.method
headers = req.headers
body = req.get_data()
if req.content_type and "multipart/form-data" in req.content_type:
body = None
try:
required_scopes = frappe.db.get_value("OAuth Bearer Token", token, "scopes").split(
get_url_delimiter()
)
valid, oauthlib_request = get_oauth_server().verify_request(
uri, http_method, body, headers, required_scopes
)
if valid:
frappe.set_user(frappe.db.get_value("OAuth Bearer Token", token, "user"))
frappe.local.form_dict = form_dict
except AttributeError:
pass
def validate_auth_via_api_keys(authorization_header):
"""
Authenticate request using API keys and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
try:
auth_type, auth_token = authorization_header
authorization_source = frappe.get_request_header("Frappe-Authorization-Source")
if auth_type.lower() == "basic":
api_key, api_secret = frappe.safe_decode(base64.b64decode(auth_token)).split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
elif auth_type.lower() == "token":
api_key, api_secret = auth_token.split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
except binascii.Error:
frappe.throw(
_("Failed to decode token, please provide a valid base64-encoded token."),
frappe.InvalidAuthorizationToken,
)
except (AttributeError, TypeError, ValueError):
pass
def validate_api_key_secret(api_key, api_secret, frappe_authorization_source=None):
"""frappe_authorization_source to provide api key and secret for a doctype apart from User"""
doctype = frappe_authorization_source or "User"
doc = frappe.db.get_value(doctype=doctype, filters={"api_key": api_key}, fieldname=["name"])
if not doc:
raise frappe.AuthenticationError
form_dict = frappe.local.form_dict
doc_secret = get_decrypted_password(doctype, doc, fieldname="api_secret")
if api_secret == doc_secret:
if doctype == "User":
user = frappe.db.get_value(doctype="User", filters={"api_key": api_key}, fieldname=["name"])
else:
user = frappe.db.get_value(doctype, doc, "user")
if frappe.local.login_manager.user in ("", "Guest"):
frappe.set_user(user)
frappe.local.form_dict = form_dict
else:
raise frappe.AuthenticationError
def validate_auth_via_hooks():
for auth_hook in frappe.get_hooks("auth_hooks", []):
frappe.get_attr(auth_hook)()