mirror of
https://github.com/frappe/frappe.git
synced 2024-06-12 21:52:20 +00:00
593 lines
18 KiB
Python
593 lines
18 KiB
Python
import base64
|
|
import datetime
|
|
import hashlib
|
|
import re
|
|
from http import cookies
|
|
from urllib.parse import unquote, urljoin, urlparse
|
|
|
|
import jwt
|
|
import pytz
|
|
from oauthlib.openid import RequestValidator
|
|
|
|
import frappe
|
|
from frappe.auth import LoginManager
|
|
from frappe.utils.data import cstr, get_system_timezone, now_datetime
|
|
|
|
|
|
class OAuthWebRequestValidator(RequestValidator):
|
|
# Pre- and post-authorization.
|
|
def validate_client_id(self, client_id, request, *args, **kwargs):
|
|
# Simple validity check, does client exist? Not banned?
|
|
cli_id = frappe.db.get_value("OAuth Client", {"name": client_id})
|
|
if cli_id:
|
|
client = frappe.get_doc("OAuth Client", client_id)
|
|
if client.user_has_allowed_role():
|
|
request.client = client.as_dict()
|
|
return True
|
|
return False
|
|
|
|
def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
|
|
# Is the client allowed to use the supplied redirect_uri? i.e. has
|
|
# the client previously registered this EXACT redirect uri.
|
|
|
|
redirect_uris = (
|
|
cstr(frappe.db.get_value("OAuth Client", client_id, "redirect_uris"))
|
|
.strip()
|
|
.split(get_url_delimiter())
|
|
)
|
|
|
|
if redirect_uri in redirect_uris:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
|
|
# The redirect used if none has been supplied.
|
|
# Prefer your clients to pre register a redirect uri rather than
|
|
# supplying one on each authorization request.
|
|
return frappe.db.get_value("OAuth Client", client_id, "default_redirect_uri")
|
|
|
|
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
|
|
# Is the client allowed to access the requested scopes?
|
|
allowed_scopes = get_client_scopes(client_id)
|
|
return all(scope in allowed_scopes for scope in scopes)
|
|
|
|
def get_default_scopes(self, client_id, request, *args, **kwargs):
|
|
# Scopes a client will authorize for if none are supplied in the
|
|
# authorization request.
|
|
scopes = get_client_scopes(client_id)
|
|
request.scopes = scopes # Apparently this is possible.
|
|
return scopes
|
|
|
|
def validate_response_type(self, client_id, response_type, client, request, *args, **kwargs):
|
|
allowed_response_types = [
|
|
# From OAuth Client response_type field
|
|
client.response_type.lower(),
|
|
# OIDC
|
|
"id_token",
|
|
"id_token token",
|
|
"code id_token",
|
|
"code token id_token",
|
|
]
|
|
|
|
return response_type in allowed_response_types
|
|
|
|
# Post-authorization
|
|
|
|
def save_authorization_code(self, client_id, code, request, *args, **kwargs):
|
|
cookie_dict = get_cookie_dict_from_headers(request)
|
|
|
|
oac = frappe.new_doc("OAuth Authorization Code")
|
|
oac.scopes = get_url_delimiter().join(request.scopes)
|
|
oac.redirect_uri_bound_to_authorization_code = request.redirect_uri
|
|
oac.client = client_id
|
|
oac.user = unquote(cookie_dict["user_id"].value)
|
|
oac.authorization_code = code["code"]
|
|
|
|
if request.nonce:
|
|
oac.nonce = request.nonce
|
|
|
|
if request.code_challenge and request.code_challenge_method:
|
|
oac.code_challenge = request.code_challenge
|
|
oac.code_challenge_method = request.code_challenge_method.lower()
|
|
|
|
oac.save(ignore_permissions=True)
|
|
frappe.db.commit()
|
|
|
|
def authenticate_client(self, request, *args, **kwargs):
|
|
# Get ClientID in URL
|
|
if request.client_id:
|
|
oc = frappe.get_doc("OAuth Client", request.client_id)
|
|
else:
|
|
# Extract token, instantiate OAuth Bearer Token and use clientid from there.
|
|
if "refresh_token" in frappe.form_dict:
|
|
oc = frappe.get_doc(
|
|
"OAuth Client",
|
|
frappe.db.get_value(
|
|
"OAuth Bearer Token",
|
|
{"refresh_token": frappe.form_dict["refresh_token"]},
|
|
"client",
|
|
),
|
|
)
|
|
elif "token" in frappe.form_dict:
|
|
oc = frappe.get_doc(
|
|
"OAuth Client",
|
|
frappe.db.get_value("OAuth Bearer Token", frappe.form_dict["token"], "client"),
|
|
)
|
|
else:
|
|
oc = frappe.get_doc(
|
|
"OAuth Client",
|
|
frappe.db.get_value(
|
|
"OAuth Bearer Token",
|
|
frappe.get_request_header("Authorization").split(" ")[1],
|
|
"client",
|
|
),
|
|
)
|
|
try:
|
|
request.client = request.client or oc.as_dict()
|
|
except Exception as e:
|
|
return generate_json_error_response(e)
|
|
|
|
cookie_dict = get_cookie_dict_from_headers(request)
|
|
user_id = unquote(cookie_dict.get("user_id").value) if "user_id" in cookie_dict else "Guest"
|
|
return frappe.session.user == user_id
|
|
|
|
def authenticate_client_id(self, client_id, request, *args, **kwargs):
|
|
cli_id = frappe.db.get_value("OAuth Client", client_id, "name")
|
|
if not cli_id:
|
|
# Don't allow public (non-authenticated) clients
|
|
return False
|
|
else:
|
|
request["client"] = frappe.get_doc("OAuth Client", cli_id)
|
|
return True
|
|
|
|
def validate_code(self, client_id, code, client, request, *args, **kwargs):
|
|
# Validate the code belongs to the client. Add associated scopes,
|
|
# state and user to request.scopes and request.user.
|
|
|
|
validcodes = frappe.get_all(
|
|
"OAuth Authorization Code",
|
|
filters={"client": client_id, "validity": "Valid"},
|
|
)
|
|
|
|
if code in [vcode["name"] for vcode in validcodes]:
|
|
request.scopes = frappe.db.get_value("OAuth Authorization Code", code, "scopes").split(
|
|
get_url_delimiter()
|
|
)
|
|
request.user = frappe.db.get_value("OAuth Authorization Code", code, "user")
|
|
code_challenge_method = frappe.db.get_value(
|
|
"OAuth Authorization Code", code, "code_challenge_method"
|
|
)
|
|
code_challenge = frappe.db.get_value("OAuth Authorization Code", code, "code_challenge")
|
|
|
|
if code_challenge and not request.code_verifier:
|
|
if frappe.db.exists("OAuth Authorization Code", code):
|
|
frappe.delete_doc("OAuth Authorization Code", code, ignore_permissions=True)
|
|
frappe.db.commit()
|
|
return False
|
|
|
|
if code_challenge_method == "s256":
|
|
m = hashlib.sha256()
|
|
m.update(bytes(request.code_verifier, "utf-8"))
|
|
code_verifier = base64.b64encode(m.digest()).decode("utf-8")
|
|
code_verifier = re.sub(r"\+", "-", code_verifier)
|
|
code_verifier = re.sub(r"\/", "_", code_verifier)
|
|
code_verifier = re.sub(r"=", "", code_verifier)
|
|
return code_challenge == code_verifier
|
|
|
|
elif code_challenge_method == "plain":
|
|
return code_challenge == request.code_verifier
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def confirm_redirect_uri(self, client_id, code, redirect_uri, client, *args, **kwargs):
|
|
saved_redirect_uri = frappe.db.get_value("OAuth Client", client_id, "default_redirect_uri")
|
|
|
|
redirect_uris = frappe.db.get_value("OAuth Client", client_id, "redirect_uris")
|
|
|
|
if redirect_uris:
|
|
redirect_uris = redirect_uris.split(get_url_delimiter())
|
|
return redirect_uri in redirect_uris
|
|
|
|
return saved_redirect_uri == redirect_uri
|
|
|
|
def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
|
|
# Clients should only be allowed to use one type of grant.
|
|
# In this case, it must be "authorization_code" or "refresh_token"
|
|
return grant_type in ["authorization_code", "refresh_token", "password"]
|
|
|
|
def save_bearer_token(self, token, request, *args, **kwargs):
|
|
# Remember to associate it with request.scopes, request.user and
|
|
# request.client. The two former will be set when you validate
|
|
# the authorization code. Don't forget to save both the
|
|
# access_token and the refresh_token and set expiration for the
|
|
# access_token to now + expires_in seconds.
|
|
|
|
otoken = frappe.new_doc("OAuth Bearer Token")
|
|
otoken.client = request.client["name"]
|
|
try:
|
|
otoken.user = (
|
|
request.user
|
|
if request.user
|
|
else frappe.db.get_value(
|
|
"OAuth Bearer Token",
|
|
{"refresh_token": request.body.get("refresh_token")},
|
|
"user",
|
|
)
|
|
)
|
|
except Exception:
|
|
otoken.user = frappe.session.user
|
|
|
|
otoken.scopes = get_url_delimiter().join(request.scopes)
|
|
otoken.access_token = token["access_token"]
|
|
otoken.refresh_token = token.get("refresh_token")
|
|
otoken.expires_in = token["expires_in"]
|
|
otoken.save(ignore_permissions=True)
|
|
frappe.db.commit()
|
|
|
|
return frappe.db.get_value("OAuth Client", request.client["name"], "default_redirect_uri")
|
|
|
|
def invalidate_authorization_code(self, client_id, code, request, *args, **kwargs):
|
|
# Authorization codes are use once, invalidate it when a Bearer token
|
|
# has been acquired.
|
|
|
|
frappe.db.set_value("OAuth Authorization Code", code, "validity", "Invalid")
|
|
frappe.db.commit()
|
|
|
|
# Protected resource request
|
|
|
|
def validate_bearer_token(self, token, scopes, request):
|
|
# Remember to check expiration and scope membership
|
|
otoken = frappe.get_doc("OAuth Bearer Token", token)
|
|
is_token_valid = (now_datetime() < otoken.expiration_time) and otoken.status != "Revoked"
|
|
client_scopes = frappe.db.get_value("OAuth Client", otoken.client, "scopes").split(
|
|
get_url_delimiter()
|
|
)
|
|
are_scopes_valid = all(scope in client_scopes for scope in scopes)
|
|
return is_token_valid and are_scopes_valid
|
|
|
|
# Token refresh request
|
|
|
|
def get_original_scopes(self, refresh_token, request, *args, **kwargs):
|
|
# Obtain the token associated with the given refresh_token and
|
|
# return its scopes, these will be passed on to the refreshed
|
|
# access token if the client did not specify a scope during the
|
|
# request.
|
|
obearer_token = frappe.get_doc("OAuth Bearer Token", {"refresh_token": refresh_token})
|
|
return obearer_token.scopes
|
|
|
|
def revoke_token(self, token, token_type_hint, request, *args, **kwargs):
|
|
"""Revoke an access or refresh token.
|
|
|
|
:param token: The token string.
|
|
:param token_type_hint: access_token or refresh_token.
|
|
:param request: The HTTP Request (oauthlib.common.Request)
|
|
|
|
Method is used by:
|
|
- Revocation Endpoint
|
|
"""
|
|
if token_type_hint == "access_token":
|
|
frappe.db.set_value("OAuth Bearer Token", token, "status", "Revoked")
|
|
elif token_type_hint == "refresh_token":
|
|
frappe.db.set_value("OAuth Bearer Token", {"refresh_token": token}, "status", "Revoked")
|
|
else:
|
|
frappe.db.set_value("OAuth Bearer Token", token, "status", "Revoked")
|
|
frappe.db.commit()
|
|
|
|
def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
|
|
"""Ensure the Bearer token is valid and authorized access to scopes.
|
|
|
|
OBS! The request.user attribute should be set to the resource owner
|
|
associated with this refresh token.
|
|
|
|
:param refresh_token: Unicode refresh token
|
|
:param client: Client object set by you, see authenticate_client.
|
|
:param request: The HTTP Request (oauthlib.common.Request)
|
|
:rtype: True or False
|
|
|
|
Method is used by:
|
|
- Authorization Code Grant (indirectly by issuing refresh tokens)
|
|
- Resource Owner Password Credentials Grant (also indirectly)
|
|
- Refresh Token Grant
|
|
"""
|
|
|
|
otoken = frappe.get_doc("OAuth Bearer Token", {"refresh_token": refresh_token, "status": "Active"})
|
|
|
|
if not otoken:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
# OpenID Connect
|
|
|
|
def finalize_id_token(self, id_token, token, token_handler, request):
|
|
# Check whether frappe server URL is set
|
|
id_token_header = {"typ": "jwt", "alg": "HS256"}
|
|
|
|
user = frappe.get_doc("User", request.user)
|
|
|
|
if request.nonce:
|
|
id_token["nonce"] = request.nonce
|
|
|
|
userinfo = get_userinfo(user)
|
|
|
|
id_token["exp"] = id_token.get("iat") + token.get("expires_in")
|
|
|
|
if userinfo.get("iss"):
|
|
id_token["iss"] = userinfo.get("iss")
|
|
|
|
if "openid" in request.scopes:
|
|
id_token.update(userinfo)
|
|
|
|
id_token_encoded = jwt.encode(
|
|
payload=id_token,
|
|
key=request.client.client_secret,
|
|
algorithm="HS256",
|
|
headers=id_token_header,
|
|
)
|
|
|
|
return frappe.safe_decode(id_token_encoded)
|
|
|
|
def get_authorization_code_nonce(self, client_id, code, redirect_uri, request):
|
|
if frappe.get_value("OAuth Authorization Code", code, "validity") == "Valid":
|
|
return frappe.get_value("OAuth Authorization Code", code, "nonce")
|
|
|
|
return None
|
|
|
|
def get_authorization_code_scopes(self, client_id, code, redirect_uri, request):
|
|
scope = frappe.get_value("OAuth Client", client_id, "scopes")
|
|
if not scope:
|
|
scope = []
|
|
else:
|
|
scope = scope.split(get_url_delimiter())
|
|
|
|
return scope
|
|
|
|
def get_jwt_bearer_token(self, token, token_handler, request):
|
|
now = datetime.datetime.now()
|
|
|
|
id_token = dict(
|
|
aud=token.client_id,
|
|
iat=round(now.timestamp()),
|
|
at_hash=calculate_at_hash(token.access_token, hashlib.sha256),
|
|
)
|
|
return self.finalize_id_token(id_token, token, token_handler, request)
|
|
|
|
def get_userinfo_claims(self, request):
|
|
user = frappe.get_doc("User", frappe.session.user)
|
|
return get_userinfo(user)
|
|
|
|
def validate_id_token(self, token, scopes, request):
|
|
try:
|
|
id_token = frappe.get_doc("OAuth Bearer Token", token)
|
|
if id_token.status == "Active":
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
return False
|
|
|
|
def validate_jwt_bearer_token(self, token, scopes, request):
|
|
try:
|
|
jwt = frappe.get_doc("OAuth Bearer Token", token)
|
|
if jwt.status == "Active":
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
return False
|
|
|
|
def validate_silent_authorization(self, request):
|
|
"""Ensure the logged in user has authorized silent OpenID authorization.
|
|
|
|
Silent OpenID authorization allows access tokens and id tokens to be
|
|
granted to clients without any user prompt or interaction.
|
|
|
|
:param request: The HTTP Request (oauthlib.common.Request)
|
|
:rtype: True or False
|
|
|
|
Method is used by:
|
|
- OpenIDConnectAuthCode
|
|
- OpenIDConnectImplicit
|
|
- OpenIDConnectHybrid
|
|
"""
|
|
if request.prompt == "login":
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def validate_silent_login(self, request):
|
|
"""Ensure session user has authorized silent OpenID login.
|
|
|
|
If no user is logged in or has not authorized silent login, this
|
|
method should return False.
|
|
|
|
If the user is logged in but associated with multiple accounts and
|
|
not selected which one to link to the token then this method should
|
|
raise an oauthlib.oauth2.AccountSelectionRequired error.
|
|
|
|
:param request: The HTTP Request (oauthlib.common.Request)
|
|
:rtype: True or False
|
|
|
|
Method is used by:
|
|
- OpenIDConnectAuthCode
|
|
- OpenIDConnectImplicit
|
|
- OpenIDConnectHybrid
|
|
"""
|
|
if frappe.session.user == "Guest" or request.prompt.lower() == "login":
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def validate_user_match(self, id_token_hint, scopes, claims, request):
|
|
"""Ensure client supplied user id hint matches session user.
|
|
|
|
If the sub claim or id_token_hint is supplied then the session
|
|
user must match the given ID.
|
|
|
|
:param id_token_hint: User identifier string.
|
|
:param scopes: List of OAuth 2 scopes and OpenID claims (strings).
|
|
:param claims: OpenID Connect claims dict.
|
|
:param request: The HTTP Request (oauthlib.common.Request)
|
|
:rtype: True or False
|
|
|
|
Method is used by:
|
|
- OpenIDConnectAuthCode
|
|
- OpenIDConnectImplicit
|
|
- OpenIDConnectHybrid
|
|
"""
|
|
if id_token_hint:
|
|
try:
|
|
user = None
|
|
payload = jwt.decode(
|
|
id_token_hint,
|
|
algorithms=["HS256"],
|
|
options={
|
|
"verify_signature": False,
|
|
"verify_aud": False,
|
|
},
|
|
)
|
|
client_id, client_secret = frappe.get_value(
|
|
"OAuth Client",
|
|
payload.get("aud"),
|
|
["client_id", "client_secret"],
|
|
)
|
|
|
|
if payload.get("sub") and client_id and client_secret:
|
|
user = frappe.db.get_value(
|
|
"User Social Login",
|
|
{"userid": payload.get("sub"), "provider": "frappe"},
|
|
"parent",
|
|
)
|
|
user = frappe.get_doc("User", user)
|
|
verified_payload = jwt.decode(
|
|
id_token_hint,
|
|
key=client_secret,
|
|
audience=client_id,
|
|
algorithms=["HS256"],
|
|
options={
|
|
"verify_exp": False,
|
|
},
|
|
)
|
|
|
|
if verified_payload:
|
|
return user.name == frappe.session.user
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
elif frappe.session.user != "Guest":
|
|
return True
|
|
|
|
return False
|
|
|
|
def validate_user(self, username, password, client, request, *args, **kwargs):
|
|
"""Ensure the username and password is valid.
|
|
|
|
Method is used by:
|
|
- Resource Owner Password Credentials Grant
|
|
"""
|
|
login_manager = LoginManager()
|
|
login_manager.authenticate(username, password)
|
|
|
|
if login_manager.user == "Guest":
|
|
return False
|
|
|
|
request.user = login_manager.user
|
|
return True
|
|
|
|
|
|
def get_cookie_dict_from_headers(r):
|
|
cookie = cookies.BaseCookie()
|
|
if r.headers.get("Cookie"):
|
|
cookie.load(r.headers.get("Cookie"))
|
|
return cookie
|
|
|
|
|
|
def calculate_at_hash(access_token, hash_alg):
|
|
"""Helper method for calculating an access token
|
|
hash, as described in http://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken
|
|
Its value is the base64url encoding of the left-most half of the hash of the octets
|
|
of the ASCII representation of the access_token value, where the hash algorithm
|
|
used is the hash algorithm used in the alg Header Parameter of the ID Token's JOSE
|
|
Header. For instance, if the alg is RS256, hash the access_token value with SHA-256,
|
|
then take the left-most 128 bits and base64url encode them. The at_hash value is a
|
|
case sensitive string.
|
|
Args:
|
|
access_token (str): An access token string.
|
|
hash_alg (callable): A callable returning a hash object, e.g. hashlib.sha256
|
|
"""
|
|
hash_digest = hash_alg(access_token.encode("utf-8")).digest()
|
|
cut_at = int(len(hash_digest) / 2)
|
|
truncated = hash_digest[:cut_at]
|
|
from jwt.utils import base64url_encode
|
|
|
|
at_hash = base64url_encode(truncated)
|
|
return at_hash.decode("utf-8")
|
|
|
|
|
|
def delete_oauth2_data():
|
|
frappe.db.delete("OAuth Authorization Code", {"validity": "Invalid"})
|
|
frappe.db.delete("OAuth Bearer Token", {"status": "Revoked"})
|
|
|
|
|
|
def get_client_scopes(client_id):
|
|
scopes_string = frappe.db.get_value("OAuth Client", client_id, "scopes")
|
|
return scopes_string.split()
|
|
|
|
|
|
def get_userinfo(user):
|
|
picture = None
|
|
frappe_server_url = get_server_url()
|
|
valid_url_schemes = ("http", "https", "ftp", "ftps")
|
|
|
|
if user.user_image:
|
|
if frappe.utils.validate_url(user.user_image, valid_schemes=valid_url_schemes):
|
|
picture = user.user_image
|
|
else:
|
|
picture = urljoin(frappe_server_url, user.user_image)
|
|
|
|
return frappe._dict(
|
|
{
|
|
"sub": frappe.db.get_value(
|
|
"User Social Login",
|
|
{"parent": user.name, "provider": "frappe"},
|
|
"userid",
|
|
),
|
|
"name": " ".join(filter(None, [user.first_name, user.last_name])),
|
|
"given_name": user.first_name,
|
|
"family_name": user.last_name,
|
|
"email": user.email,
|
|
"picture": picture,
|
|
"roles": frappe.get_roles(user.name),
|
|
"iss": frappe_server_url,
|
|
}
|
|
)
|
|
|
|
|
|
def get_url_delimiter(separator_character=" "):
|
|
return separator_character
|
|
|
|
|
|
def generate_json_error_response(e):
|
|
if not e:
|
|
e = frappe._dict({})
|
|
|
|
frappe.local.response = frappe._dict(
|
|
{
|
|
"description": getattr(e, "description", "Internal Server Error"),
|
|
"status_code": getattr(e, "status_code", 500),
|
|
"error": getattr(e, "error", "internal_server_error"),
|
|
}
|
|
)
|
|
frappe.local.response["http_status_code"] = getattr(e, "status_code", 500)
|
|
return
|
|
|
|
|
|
def get_server_url():
|
|
request_url = urlparse(frappe.request.url)
|
|
request_url = f"{request_url.scheme}://{request_url.netloc}"
|
|
return frappe.get_value("Social Login Key", "frappe", "base_url") or request_url
|