diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md
index 93821ef..7147041 100644
--- a/DEVELOPMENT.md
+++ b/DEVELOPMENT.md
@@ -36,6 +36,11 @@ Then run :
```
pip install pip-tools
-pip-compile requirements.in # freeze package versions
-pip-compile requirements-test.in
+pip-compile --output-file=requirements.txt pyproject.toml # freeze package versions
+pip-compile --output-file=requirements-test.txt requirements-test.in
+```
+
+FIXME: possible alternative for tests requirements would be:
+```
+python -m piptools compile --extra test -o requirements-test.txt pyproject.toml
```
diff --git a/django_pyoidc/VERSION b/django_pyoidc/VERSION
index 1750564..5a5831a 100644
--- a/django_pyoidc/VERSION
+++ b/django_pyoidc/VERSION
@@ -1 +1 @@
-0.0.6
+0.0.7
diff --git a/django_pyoidc/__init__.py b/django_pyoidc/__init__.py
index b6caaf1..25497cb 100644
--- a/django_pyoidc/__init__.py
+++ b/django_pyoidc/__init__.py
@@ -1,23 +1,75 @@
from typing import Dict
from django.contrib.auth import get_user_model
+from django.core.exceptions import SuspiciousOperation
+from django_pyoidc.exceptions import ClaimNotFoundError
+from django_pyoidc.utils import extract_claim_from_tokens
-def get_user_by_email(userinfo_token: Dict[str, str], id_token_claims: Dict):
+
+def get_user_by_email(tokens: Dict):
User = get_user_model()
- username = ""
+ username = None
+ preferred_username = None
+ email = None
+ client_host = None
+ client_address = None
+ client_id = None
+ django_username = None
+
+ try:
+ email = extract_claim_from_tokens("email", tokens)
+ except ClaimNotFoundError:
+ pass
+
+ try:
+ preferred_username = extract_claim_from_tokens("preferred_username", tokens)
+ except ClaimNotFoundError:
+ pass
- if "preferred_username" in id_token_claims:
- username = id_token_claims["preferred_username"]
- elif "preferred_username" in userinfo_token:
- username = userinfo_token["preferred_username"]
+ try:
+ username = extract_claim_from_tokens("username", tokens)
+ except ClaimNotFoundError:
+ pass
+
+ if preferred_username is None:
+ if username is None:
+ if email:
+ django_username = email
+ else:
+ raise SuspiciousOperation(
+ "Cannot extract username or email from available OIDC tokens."
+ )
+ else:
+ django_username = username
else:
- username = userinfo_token["email"]
+ django_username = preferred_username
+
+ # Currently client Credential logins in Keycloak adds these mappers in access tokens.
+ # we can use that to detect M2M accounts.
+ # TODO: check if any other mapper can/should be used for other providers
+ try:
+ client_host = extract_claim_from_tokens("clientHost", tokens)
+ except ClaimNotFoundError:
+ pass
+ try:
+ client_address = extract_claim_from_tokens("clientAddress", tokens)
+ except ClaimNotFoundError:
+ pass
+
+ if email is None and client_host is not None and client_address is not None:
+ # that's a M2M connexion in grant=client_credential mode
+ try:
+ client_id = extract_claim_from_tokens("client_id", tokens)
+ except ClaimNotFoundError:
+ client_id = django_username
+ # Build a fake email for the service accounts
+ email = f"{client_id}@localhost.lan"
user, created = User.objects.get_or_create(
- email=userinfo_token["email"],
- username=username,
+ email=email,
+ username=django_username,
)
user.backend = "django.contrib.auth.backends.ModelBackend"
return user
diff --git a/django_pyoidc/client.py b/django_pyoidc/client.py
new file mode 100644
index 0000000..56cb604
--- /dev/null
+++ b/django_pyoidc/client.py
@@ -0,0 +1,59 @@
+import logging
+
+# import oic
+from oic.extension.client import Client as ClientExtension
+from oic.oic.consumer import Consumer
+from oic.utils.authn.client import CLIENT_AUTHN_METHOD
+
+from django_pyoidc.session import OIDCCacheSessionBackendForDjango
+from django_pyoidc.utils import OIDCCacheBackendForDjango, get_setting_for_sso_op
+
+logger = logging.getLogger(__name__)
+
+
+class OIDCClient:
+ def __init__(self, op_name, session_id=None):
+ self._op_name = op_name
+
+ self.session_cache_backend = OIDCCacheSessionBackendForDjango(self._op_name)
+ self.general_cache_backend = OIDCCacheBackendForDjango(self._op_name)
+
+ consumer_config = {
+ # "debug": True,
+ "response_type": "code"
+ }
+
+ client_config = {
+ "client_id": get_setting_for_sso_op(op_name, "OIDC_CLIENT_ID"),
+ "client_authn_method": CLIENT_AUTHN_METHOD,
+ }
+
+ self.consumer = Consumer(
+ session_db=self.session_cache_backend,
+ consumer_config=consumer_config,
+ client_config=client_config,
+ )
+ # used in token introspection
+ self.client_extension = ClientExtension(**client_config)
+
+ provider_info_uri = get_setting_for_sso_op(
+ op_name, "OIDC_PROVIDER_DISCOVERY_URI"
+ )
+ client_secret = get_setting_for_sso_op(op_name, "OIDC_CLIENT_SECRET")
+ self.client_extension.client_secret = client_secret
+
+ if session_id:
+ self.consumer.restore(session_id)
+ else:
+
+ cache_key = self.general_cache_backend.generate_hashed_cache_key(
+ provider_info_uri
+ )
+ try:
+ config = self.general_cache_backend[cache_key]
+ except KeyError:
+ config = self.consumer.provider_config(provider_info_uri)
+ # shared microcache for provider config
+ # FIXME: Setting for duration
+ self.general_cache_backend.set(cache_key, config, 60)
+ self.consumer.client_secret = client_secret
diff --git a/django_pyoidc/drf/authentication.py b/django_pyoidc/drf/authentication.py
new file mode 100644
index 0000000..6470166
--- /dev/null
+++ b/django_pyoidc/drf/authentication.py
@@ -0,0 +1,126 @@
+import logging
+
+from django.conf import settings
+from django.core.exceptions import PermissionDenied
+from rest_framework import exceptions
+from rest_framework.authentication import BaseAuthentication
+
+from django_pyoidc.client import OIDCClient
+from django_pyoidc.engine import OIDCEngine
+from django_pyoidc.utils import (
+ OIDCCacheBackendForDjango,
+ check_audience,
+ get_setting_for_sso_op,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class OIDCBearerAuthentication(BaseAuthentication):
+ def __init__(self, *args, **kwargs):
+ super(OIDCBearerAuthentication, self).__init__(*args, **kwargs)
+ self.op_name = self.extract_drf_opname()
+ self.general_cache_backend = OIDCCacheBackendForDjango(self.op_name)
+ self.client = OIDCClient(self.op_name)
+ self.engine = OIDCEngine(self.op_name)
+
+ def extract_drf_opname(self):
+ """
+ Given a list of opnames and setting in DJANGO_PYOIDC conf, extract the one having USED_BY_REST_FRAMEWORK=True.
+ """
+ op = None
+ found = False
+ for op_name, configs in settings.DJANGO_PYOIDC.items():
+ if (
+ "USED_BY_REST_FRAMEWORK" in configs
+ and configs["USED_BY_REST_FRAMEWORK"]
+ ):
+ if found:
+ raise RuntimeError(
+ "Several DJANGO_PYOIDC sections are declared as USED_BY_REST_FRAMEWORK, only one should be used."
+ )
+ found = True
+ op = op_name
+ if found:
+ return op
+ else:
+ raise RuntimeError(
+ "No DJANGO_PYOIDC sections are declared with USED_BY_REST_FRAMEWORK configuration option."
+ )
+
+ def extract_access_token(self, request) -> str:
+ val = request.headers.get("Authorization")
+ if not val:
+ msg = "Request missing the authorization header."
+ raise RuntimeError(msg)
+ val = val.strip()
+ bearer_name, access_token_jwt = val.split(maxsplit=1)
+ requested_bearer_name = get_setting_for_sso_op(
+ self.op_name, "OIDC_API_BEARER_NAME", "Bearer"
+ )
+ if not bearer_name.lower() == requested_bearer_name.lower():
+ msg = f"Bad authorization header, invalid Keyword for the bearer, expecting {requested_bearer_name}."
+ raise RuntimeError(msg)
+ return access_token_jwt
+
+ def authenticate(self, request):
+ """
+ Returns two-tuple of (user, token) if authentication succeeds,
+ or None otherwise.
+ """
+ try:
+ user = None
+ access_token_claims = None
+
+ # Extract the access token from an HTTP Authorization Bearer header
+ try:
+ access_token_jwt = self.extract_access_token(request)
+ except RuntimeError as e:
+ logger.error(e)
+ # we return None, and not an Error.
+ # API auth failed, but maybe anon access is allowed
+ return None
+
+ # This introspection of the token is made by the SSO server
+ # so it is quite slow, but there's a cache added based on the token expiration
+ access_token_claims = self.engine.introspect_access_token(
+ access_token_jwt, client=self.client
+ )
+ logger.debug(access_token_claims)
+ if not access_token_claims.get("active"):
+ msg = "Inactive access token."
+ raise exceptions.AuthenticationFailed(msg)
+
+ # FIXME: add an option to request userinfo here, but that may be quite slow
+
+ if access_token_claims:
+ logger.debug("Request has valid access token.")
+
+ # FIXME: Add a setting to disable
+ client_id = get_setting_for_sso_op(self.op_name, "OIDC_CLIENT_ID")
+ if not check_audience(client_id, access_token_claims):
+ raise PermissionDenied(
+ f"Invalid result for acces token audiences check for {client_id}."
+ )
+
+ logger.debug("Let application load user via user hook.")
+ user = self.engine.call_get_user_function(
+ tokens={
+ "access_token_jwt": access_token_jwt,
+ "access_token_claims": access_token_claims,
+ }
+ )
+
+ if not user:
+ logger.error(
+ "OIDC Bearer Authentication process failure. Cannot set active authenticated user."
+ )
+ return None
+
+ except PermissionDenied as exp:
+ raise exp
+ except Exception as exp:
+ logger.exception(exp)
+ return None
+
+ return (user, access_token_claims)
diff --git a/django_pyoidc/engine.py b/django_pyoidc/engine.py
new file mode 100644
index 0000000..78138bc
--- /dev/null
+++ b/django_pyoidc/engine.py
@@ -0,0 +1,78 @@
+import datetime
+import logging
+
+from django_pyoidc import get_user_by_email
+from django_pyoidc.client import OIDCClient
+from django_pyoidc.utils import (
+ OIDCCacheBackendForDjango,
+ get_setting_for_sso_op,
+ import_object,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class OIDCEngine:
+ def __init__(self, op_name: str):
+ self.op_name = op_name
+ self.general_cache_backend = OIDCCacheBackendForDjango(self.op_name)
+
+ def call_function(self, setting_name, *args, **kwargs):
+ function_path = get_setting_for_sso_op(self.op_name, setting_name)
+ if function_path:
+ func = import_object(function_path, "")
+ return func(*args, **kwargs)
+
+ def call_get_user_function(self, tokens={}):
+ if get_setting_for_sso_op(self.op_name, "HOOK_GET_USER"):
+ logger.debug("OIDC, Calling user hook on get_user")
+ return self.call_function("HOOK_GET_USER", tokens)
+ else:
+ return get_user_by_email(tokens)
+
+ def introspect_access_token(self, access_token_jwt: str, client: OIDCClient):
+ """
+ Perform a cached intropesction call to extract claims from encoded jwt of the access_token
+ """
+ # FIXME: allow a non-cached mode by global settings
+ access_token_claims = None
+
+ # FIXME: in what case could we not have an access token available?
+ # should we raise an error then?
+ if access_token_jwt is not None:
+ cache_key = self.general_cache_backend.generate_hashed_cache_key(
+ access_token_jwt
+ )
+ try:
+ access_token_claims = self.general_cache_backend["cache_key"]
+ except KeyError:
+ # CACHE MISS
+
+ # RFC 7662: token introspection: ask SSO to validate and render the jwt as json
+ # this means a slow web call
+ request_args = {
+ "token": access_token_jwt,
+ "token_type_hint": "access_token",
+ }
+ client_auth_method = client.consumer.registration_response.get(
+ "introspection_endpoint_auth_method", "client_secret_basic"
+ )
+ introspection = client.client_extension.do_token_introspection(
+ request_args=request_args,
+ authn_method=client_auth_method,
+ endpoint=client.consumer.introspection_endpoint,
+ )
+ access_token_claims = introspection.to_dict()
+
+ # store it in cache
+ current = datetime.datetime.now().strftime("%s")
+ if "exp" not in access_token_claims:
+ raise RuntimeError("No expiry set on the access token.")
+ access_token_expiry = access_token_claims["exp"]
+ exp = int(access_token_expiry) - int(current)
+ logger.debug(
+ f"Token expiry: {exp} - current is {current} "
+ f"and expiry is set to {access_token_expiry} in the token"
+ )
+ self.general_cache_backend.set(cache_key, access_token_claims, exp)
+ return access_token_claims
diff --git a/django_pyoidc/exceptions.py b/django_pyoidc/exceptions.py
new file mode 100644
index 0000000..fc9ec8d
--- /dev/null
+++ b/django_pyoidc/exceptions.py
@@ -0,0 +1,6 @@
+class InvalidSIDException(Exception):
+ pass
+
+
+class ClaimNotFoundError(Exception):
+ pass
diff --git a/django_pyoidc/utils.py b/django_pyoidc/utils.py
index 9be9a9c..d5ad963 100644
--- a/django_pyoidc/utils.py
+++ b/django_pyoidc/utils.py
@@ -1,10 +1,13 @@
import hashlib
import logging
-from typing import Dict, Union
+from importlib import import_module
+from typing import Any, Dict, Union
from django.conf import settings
from django.core.cache import BaseCache, caches
+from django_pyoidc.exceptions import ClaimNotFoundError
+
logger = logging.getLogger(__name__)
@@ -19,6 +22,57 @@ def get_settings_for_sso_op(op_name: str):
return settings.DJANGO_PYOIDC[op_name]
+def import_object(path, def_name):
+ try:
+ mod, cls = path.split(":", 1)
+ except ValueError:
+ mod = path
+ cls = def_name
+
+ return getattr(import_module(mod), cls)
+
+
+def extract_claim_from_tokens(claim: str, tokens: dict) -> Any:
+ """Given a dictionnary of tokens claims, extract the given claim.
+
+ This function will seek in "info_token_claims", then "id_token_claims"
+ and finally "access_token_claims".
+ If the claim is not found a ClaimNotFoundError exception is raised.
+ """
+ if "info_token_claims" in tokens and claim in tokens["info_token_claims"]:
+ value = tokens["info_token_claims"][claim]
+ elif "id_token_claims" in tokens and claim in tokens["id_token_claims"]:
+ value = tokens["id_token_claims"][claim]
+ elif "access_token_claims" and claim in tokens["access_token_claims"]:
+ value = tokens["access_token_claims"][claim]
+ else:
+ raise ClaimNotFoundError(f"{claim} not found in available OIDC tokens.")
+ return value
+
+
+def check_audience(client_id: str, access_token_claims: dict) -> bool:
+ """Verify that the current client_id is present in 'aud' claim.
+
+ Audences are stored in 'aud' claim.
+ Audiences of an access token is a list of client_id where this token is allowed.
+ When receiving an access token in 'API' bearer mode checking that your client_id
+ is in the audience is a must.
+ Access tokens received in 'full' mode, when this Django is the OIDC client
+ managing the redirections to the SSO server may not always contain the client_id
+ in 'aud'. This is the case for Keycloak for example, where the 'aud' would only
+ contain 'others' client_id where this token can be used, and not the one generating it.
+ Audience in userinfo and id tokens are different beasts.
+ """
+ if "aud" not in access_token_claims:
+ return False
+ if client_id not in access_token_claims["aud"]:
+ logger.error(
+ f"{client_id} not found in access_token_claims['aud']: {access_token_claims['aud']}"
+ )
+ return False
+ return True
+
+
class OIDCCacheBackendForDjango:
"""Implement General cache for OIDC using django cache"""
diff --git a/django_pyoidc/views.py b/django_pyoidc/views.py
index 8bbfc14..8dbc8b4 100644
--- a/django_pyoidc/views.py
+++ b/django_pyoidc/views.py
@@ -1,4 +1,3 @@
-import datetime
import logging
from importlib import import_module
@@ -14,86 +13,18 @@
from django.views.decorators.csrf import csrf_exempt
from jwt import JWT
from jwt.exceptions import JWTDecodeError
-from oic.extension.client import Client as ClientExtension
-from oic.oic.consumer import Consumer
-from oic.utils.authn.client import CLIENT_AUTHN_METHOD
-from django_pyoidc import get_user_by_email
+from django_pyoidc.client import OIDCClient
+from django_pyoidc.engine import OIDCEngine
+from django_pyoidc.exceptions import InvalidSIDException
from django_pyoidc.models import OIDCSession
-from django_pyoidc.session import OIDCCacheSessionBackendForDjango
-from django_pyoidc.utils import (
- OIDCCacheBackendForDjango,
- get_setting_for_sso_op,
- get_settings_for_sso_op,
-)
+from django_pyoidc.utils import get_setting_for_sso_op, import_object
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
logger = logging.getLogger(__name__)
-class InvalidSIDException(Exception):
- pass
-
-
-def _import_object(path, def_name):
- try:
- mod, cls = path.split(":", 1)
- except ValueError:
- mod = path
- cls = def_name
-
- return getattr(import_module(mod), cls)
-
-
-class OIDClient:
- def __init__(self, op_name, session_id=None):
- self._op_name = op_name
-
- self.session_cache_backend = OIDCCacheSessionBackendForDjango(self._op_name)
- self.general_cache_backend = OIDCCacheBackendForDjango(self._op_name)
-
- consumer_config = {
- # "debug": True,
- "response_type": "code"
- }
-
- client_config = {
- "client_id": get_setting_for_sso_op(op_name, "OIDC_CLIENT_ID"),
- "client_authn_method": CLIENT_AUTHN_METHOD,
- }
-
- self.consumer = Consumer(
- session_db=self.session_cache_backend,
- consumer_config=consumer_config,
- client_config=client_config,
- )
- # used in token introspection
- self.client_extension = ClientExtension(**client_config)
-
- provider_info_uri = get_setting_for_sso_op(
- op_name, "OIDC_PROVIDER_DISCOVERY_URI"
- )
- client_secret = get_setting_for_sso_op(op_name, "OIDC_CLIENT_SECRET")
- self.client_extension.client_secret = client_secret
-
- if session_id:
- self.consumer.restore(session_id)
- else:
-
- cache_key = self.general_cache_backend.generate_hashed_cache_key(
- provider_info_uri
- )
- try:
- config = self.general_cache_backend[cache_key]
- except KeyError:
- config = self.consumer.provider_config(provider_info_uri)
- # shared microcache for provider config
- # FIXME: Setting for duration
- self.general_cache_backend.set(cache_key, config, 60)
- self.consumer.client_secret = client_secret
-
-
class OIDCMixin:
op_name = None
@@ -111,18 +42,9 @@ def get_setting(self, name, default=None):
def call_function(self, setting_name, *args, **kwargs):
function_path = get_setting_for_sso_op(self.op_name, setting_name)
if function_path:
- func = _import_object(function_path, "")
+ func = import_object(function_path, "")
return func(*args, **kwargs)
- def call_get_user_function(self, tokens={}):
- if "HOOK_GET_USER" in get_settings_for_sso_op(self.op_name):
- logger.debug("OIDC, Calling user hook on get_user")
- return self.call_function("HOOK_GET_USER", tokens)
- else:
- return get_user_by_email(
- tokens["info_token_claims"], tokens["id_token_claims"]
- )
-
def call_callback_function(self, request, user):
logger.debug("OIDC, Calling user hook on login")
self.call_function("HOOK_USER_LOGIN", request, user)
@@ -145,7 +67,7 @@ def get_next_url(self, request, redirect_field_name):
Adapted from https://github.com/mozilla/mozilla-django-oidc/blob/71e4af8283a10aa51234de705d34cd298e927f97/mozilla_django_oidc/views.py#L132
"""
next_url = request.GET.get(redirect_field_name)
- print(f"{next_url=}")
+ # print(f"{next_url=}")
if next_url:
is_safe = url_has_allowed_host_and_scheme(
next_url,
@@ -179,9 +101,9 @@ class OIDCLoginView(OIDCView):
def get(self, request, *args, **kwargs):
super().get(request, *args, **kwargs)
- client = OIDClient(self.op_name)
- client.consumer.consumer_config["authz_page"] = str(
- self.get_setting("OIDC_CALLBACK_PATH")
+ client = OIDCClient(self.op_name)
+ client.consumer.consumer_config["authz_page"] = self.get_setting(
+ "OIDC_CALLBACK_PATH"
)
redirect_uri = self.get_next_url(request, "next")
@@ -264,7 +186,7 @@ def post(self, request):
if sid:
try:
- client = OIDClient(self.op_name, session_id=sid)
+ client = OIDCClient(self.op_name, session_id=sid)
except Exception as e: # FIXME : Finer exception handling (KeyError,ParseError,CommunicationError)
logger.error("OIDC Logout call error when loading OIDC state: ")
logger.exception(e)
@@ -316,7 +238,7 @@ class OIDCBackChannelLogoutView(OIDCView):
http_method_names = ["post"]
- def logout_sessions_by_sid(self, client: OIDClient, sid: str, body):
+ def logout_sessions_by_sid(self, client: OIDCClient, sid: str, body):
validated_sid = client.consumer.backchannel_logout(
request_args={"logout_token": body}
)
@@ -326,7 +248,7 @@ def logout_sessions_by_sid(self, client: OIDClient, sid: str, body):
for session in sessions:
self._logout_session(session)
- def logout_sessions_by_sub(self, client: OIDClient, sub: str, body):
+ def logout_sessions_by_sub(self, client: OIDCClient, sub: str, body):
sessions = OIDCSession.objects.filter(sub=sub)
for session in sessions:
client.consumer.backchannel_logout(request_args={"logout_token": body})
@@ -350,10 +272,10 @@ def post(self, request):
sub = decoded.get("sub")
if sub:
# Authorization server wants to kill all sessions
- client = OIDClient(self.op_name)
+ client = OIDCClient(self.op_name)
self.logout_sessions_by_sub(client, sub, body)
elif sid:
- client = OIDClient(self.op_name, session_id=sid)
+ client = OIDCClient(self.op_name, session_id=sid)
try:
self.logout_sessions_by_sid(client, sid, body)
except InvalidSIDException as e:
@@ -385,7 +307,7 @@ class OIDCCallbackView(OIDCView):
def __init__(self, **kwargs):
super().__init__(**kwargs)
- self.general_cache_backend = OIDCCacheBackendForDjango(self.op_name)
+ self.engine = OIDCEngine(self.op_name)
def success_url(self, request):
# Pull the next url from the session or settings --we don't need to
@@ -408,54 +330,13 @@ def _introspect_access_token(self, access_token_jwt):
"""
Perform a cached intropesction call to extract claims from encoded jwt of the access_token
"""
- # FIXME: allow a non-cached mode by global settings
- access_token_claims = None
-
- # FIXME: in what case could we not have an access token available?
- # should we raise an error then?
- if access_token_jwt is not None:
- cache_key = self.general_cache_backend.generate_hashed_cache_key(
- access_token_jwt
- )
- try:
- access_token_claims = self.general_cache_backend["cache_key"]
- except KeyError:
- # CACHE MISS
-
- # RFC 7662: token introspection: ask SSO to validate and render the jwt as json
- # this means a slow web call
- request_args = {
- "token": access_token_jwt,
- "token_type_hint": "access_token",
- }
- client_auth_method = self.client.consumer.registration_response.get(
- "introspection_endpoint_auth_method", "client_secret_basic"
- )
- introspection = self.client.client_extension.do_token_introspection(
- request_args=request_args,
- authn_method=client_auth_method,
- endpoint=self.client.consumer.introspection_endpoint,
- )
- access_token_claims = introspection.to_dict()
-
- # store it in cache
- current = datetime.datetime.now().strftime("%s")
- if "exp" not in access_token_claims:
- raise RuntimeError("No expiry set on the access token.")
- access_token_expiry = access_token_claims["exp"]
- print("**********************")
- print(current)
- print(access_token_expiry)
- exp = int(access_token_expiry) - int(current)
- print(exp)
- self.general_cache_backend.set(cache_key, access_token_claims, exp)
- return access_token_claims
+ return
def get(self, request, *args, **kwargs):
super().get(request, *args, **kwargs)
try:
if "oidc_sid" in request.session:
- self.client = OIDClient(
+ self.client = OIDCClient(
self.op_name, session_id=request.session["oidc_sid"]
)
@@ -495,8 +376,8 @@ def get(self, request, *args, **kwargs):
tokens["access_token"] if "access_token" in tokens else None
)
- access_token_claims = self._introspect_access_token(
- access_token_jwt
+ access_token_claims = self.engine.introspect_access_token(
+ access_token_jwt, self.client
)
id_token_claims = (
@@ -508,7 +389,7 @@ def get(self, request, *args, **kwargs):
userinfo_claims = userinfo.to_dict()
# Call user hook
- user = self.call_get_user_function(
+ user = self.engine.call_get_user_function(
tokens={
"info_token_claims": userinfo_claims,
"access_token_jwt": access_token_jwt,
diff --git a/pyproject.toml b/pyproject.toml
index 8f02377..21c747b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -13,7 +13,7 @@ classifiers=["Topic :: Utilities",
"Intended Audience :: Developers",
"Environment :: Web Environment",
"Framework :: Django",
- "Development Status :: 5 - Production/Stable",
+ "Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
diff --git a/requirements-test.in b/requirements-test.in
index d1e700c..0f67c9e 100644
--- a/requirements-test.in
+++ b/requirements-test.in
@@ -6,4 +6,6 @@ sphinx-rtd-theme
sphinx-autobuild
isort
pre-commit
-selenium
+selenium>4.10.0
+djangorestframework>=3.15
+django-cors-headers
\ No newline at end of file
diff --git a/requirements-test.txt b/requirements-test.txt
index 0c73ee5..0e1b47f 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,15 +1,16 @@
#
-# This file is autogenerated by pip-compile with Python 3.12
+# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements-test.txt requirements-test.in
#
-alabaster==0.7.16
+alabaster==0.7.13
# via sphinx
-anyio==4.3.0
+asgiref==3.8.1
# via
- # starlette
- # watchfiles
+ # -c requirements.txt
+ # django
+ # django-cors-headers
attrs==23.2.0
# via
# outcome
@@ -27,28 +28,36 @@ charset-normalizer==3.3.2
# via
# -c requirements.txt
# requests
-click==8.1.7
- # via uvicorn
colorama==0.4.6
# via sphinx-autobuild
distlib==0.3.8
# via virtualenv
+django==4.2.13
+ # via
+ # -c requirements.txt
+ # django-cors-headers
+ # djangorestframework
+django-cors-headers==4.3.1
+ # via -r requirements-test.in
+djangorestframework==3.15.1
+ # via -r requirements-test.in
docutils==0.19
# via
# sphinx
# sphinx-rtd-theme
+exceptiongroup==1.2.1
+ # via
+ # trio
+ # trio-websocket
filelock==3.14.0
# via virtualenv
h11==0.14.0
- # via
- # uvicorn
- # wsproto
+ # via wsproto
identify==2.5.36
# via pre-commit
idna==3.7
# via
# -c requirements.txt
- # anyio
# requests
# trio
imagesize==1.4.1
@@ -57,6 +66,8 @@ isort==5.13.2
# via -r requirements-test.in
jinja2==3.1.4
# via sphinx
+livereload==2.6.3
+ # via sphinx-autobuild
markupsafe==2.1.5
# via
# -c requirements.txt
@@ -69,7 +80,7 @@ packaging==24.0
# via sphinx
platformdirs==4.2.2
# via virtualenv
-pre-commit==3.7.1
+pre-commit==3.5.0
# via -r requirements-test.in
psycopg2==2.9.9
# via -r requirements-test.in
@@ -87,10 +98,12 @@ requests==2.32.1
# sphinx
selenium==4.21.0
# via -r requirements-test.in
-sniffio==1.3.1
+six==1.16.0
# via
- # anyio
- # trio
+ # -c requirements.txt
+ # livereload
+sniffio==1.3.1
+ # via trio
snowballstemmer==2.2.0
# via sphinx
sortedcontainers==2.4.0
@@ -101,26 +114,30 @@ sphinx==6.2.1
# sphinx-autobuild
# sphinx-rtd-theme
# sphinxcontrib-jquery
-sphinx-autobuild==2024.4.16
+sphinx-autobuild==2021.3.14
# via -r requirements-test.in
sphinx-rtd-theme==2.0.0
# via -r requirements-test.in
-sphinxcontrib-applehelp==1.0.8
+sphinxcontrib-applehelp==1.0.4
# via sphinx
-sphinxcontrib-devhelp==1.0.6
+sphinxcontrib-devhelp==1.0.2
# via sphinx
-sphinxcontrib-htmlhelp==2.0.5
+sphinxcontrib-htmlhelp==2.0.1
# via sphinx
sphinxcontrib-jquery==4.1
# via sphinx-rtd-theme
sphinxcontrib-jsmath==1.0.1
# via sphinx
-sphinxcontrib-qthelp==1.0.7
+sphinxcontrib-qthelp==1.0.3
# via sphinx
-sphinxcontrib-serializinghtml==1.1.10
+sphinxcontrib-serializinghtml==1.1.5
# via sphinx
-starlette==0.37.2
- # via sphinx-autobuild
+sqlparse==0.5.0
+ # via
+ # -c requirements.txt
+ # django
+tornado==6.4
+ # via livereload
trio==0.25.1
# via
# selenium
@@ -130,20 +147,15 @@ trio-websocket==0.11.1
typing-extensions==4.11.0
# via
# -c requirements.txt
+ # asgiref
# selenium
urllib3[socks]==2.2.1
# via
# -c requirements.txt
# requests
# selenium
-uvicorn==0.29.0
- # via sphinx-autobuild
virtualenv==20.26.2
# via pre-commit
-watchfiles==0.21.0
- # via sphinx-autobuild
-websockets==12.0
- # via sphinx-autobuild
wsproto==1.2.0
# via trio-websocket
diff --git a/requirements.txt b/requirements.txt
index a6da609..324c87d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
#
-# This file is autogenerated by pip-compile with Python 3.12
+# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements.txt pyproject.toml
@@ -20,7 +20,7 @@ cryptography==42.0.7
# oic
defusedxml==0.7.1
# via oic
-django==5.0.6
+django==4.2.13
# via django-pyoidc (pyproject.toml)
future==1.0.0
# via pyjwkest
@@ -62,6 +62,7 @@ sqlparse==0.5.0
# via django
typing-extensions==4.11.0
# via
+ # asgiref
# pydantic
# pydantic-core
urllib3==2.2.1
diff --git a/tests/e2e/front_test_app/Dockerfile b/tests/e2e/front_test_app/Dockerfile
new file mode 100644
index 0000000..d5fbd86
--- /dev/null
+++ b/tests/e2e/front_test_app/Dockerfile
@@ -0,0 +1,9 @@
+FROM php:7.2-apache
+
+RUN sed -i 's/Listen 80/Listen 9999/' /etc/apache2/ports.conf
+RUN cat /etc/apache2/ports.conf
+RUN chmod 777 /var/run/apache2
+
+COPY *.js /var/www/html/
+COPY *.php /var/www/html/
+COPY *.css /var/www/html/
\ No newline at end of file
diff --git a/tests/e2e/front_test_app/README.md b/tests/e2e/front_test_app/README.md
new file mode 100644
index 0000000..debc575
--- /dev/null
+++ b/tests/e2e/front_test_app/README.md
@@ -0,0 +1,7 @@
+# Very simpel front app using OIDC
+
+Inspired by the Keycloak demo project (Apache2 License):
+
+* https://github.com/keycloak/keycloak-demo/tree/master/demo-app
+
+Used to test OIDC front interaction with our Django backend in API mode.
diff --git a/tests/e2e/front_test_app/app.js b/tests/e2e/front_test_app/app.js
new file mode 100644
index 0000000..359b5cd
--- /dev/null
+++ b/tests/e2e/front_test_app/app.js
@@ -0,0 +1,70 @@
+var keycloak = new Keycloak({
+ realm: 'realm1',
+ clientId: 'app1-front'
+});
+
+window.onload = async function () {
+ try {
+ var authenticated = await keycloak.init({
+ onLoad: 'check-sso',
+ checkLoginIframe: false
+ })
+ } catch (error) {
+ var output = document.getElementById('message');
+ output.innerHTML = 'Failed to initialize connexion with Keycloak SSO';
+ }
+ console.log(authenticated);
+ if (authenticated) {
+ userAuthenticated();
+ } else {
+ userNotAuthenticated();
+ }
+ document.getElementById('wrapper').style.display = 'block';
+}
+
+function userNotAuthenticated() {
+ document.getElementById('anon').style.display = 'block';
+ document.getElementById('authenticated').style.display = 'none';
+}
+
+function userAuthenticated() {
+ document.getElementById('anon').style.display = 'none';
+ document.getElementById('authenticated').style.display = 'block';
+ document.getElementById('message').innerHTML = 'User: ' + keycloak.tokenParsed['preferred_username'];
+ document.getElementById('debug').innerHTML = JSON.stringify(keycloak.tokenParsed);
+}
+
+async function request(endpoint) {
+ var req = new XMLHttpRequest();
+ var output = document.getElementById('message');
+ req.open('GET', backendUrl + '/' + endpoint, true);
+ req.setRequestHeader('Accept', 'application/json');
+
+ if (keycloak.authenticated) {
+ try {
+ success = await keycloak.updateToken(30)
+ output.innerHTML = 'Sending request with Bearer';
+ req.setRequestHeader('Authorization', 'Bearer ' + keycloak.token);
+ } catch (error) {
+ output.innerHTML = 'Failed to refresh user token';
+ };
+ }
+
+ req.onreadystatechange = function () {
+ if (req.readyState == 4) {
+ if (req.status == 200) {
+ output.innerHTML = 'Message: ' + JSON.stringify(req.responseText);
+ } else if (req.status == 403) {
+ output.innerHTML = 'Request Forbidden';
+ } else if (req.status == 0) {
+ output.innerHTML = 'Request failed';
+ } else {
+ output.innerHTML = '' + req.status + ' ' + req.statusText + '';
+ }
+ }
+ };
+
+ req.send();
+}
+
+keycloak.onAuthLogout = userNotAuthenticated;
diff --git a/tests/e2e/front_test_app/index.php b/tests/e2e/front_test_app/index.php
new file mode 100644
index 0000000..c481938
--- /dev/null
+++ b/tests/e2e/front_test_app/index.php
@@ -0,0 +1,42 @@
+
+
+
+
+ Keycloak Front App Example
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/e2e/front_test_app/styles.css b/tests/e2e/front_test_app/styles.css
new file mode 100644
index 0000000..65c93dd
--- /dev/null
+++ b/tests/e2e/front_test_app/styles.css
@@ -0,0 +1,86 @@
+body {
+ background-color: #333;
+ font-family: sans-serif;
+ font-size: 30px;
+}
+
+button {
+ font-family: sans-serif;
+ font-size: 30px;
+ width: 32%;
+
+ background-color: #0085cf;
+ background-image: linear-gradient(to bottom, #00a8e1 0%, #0085cf 100%);
+ background-repeat: repeat-x;
+
+ border: 2px solid #ccc;
+ color: #fff;
+
+ text-transform: uppercase;
+
+ -webkit-box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+ -moz-box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+ box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+}
+
+button:hover {
+ background-color: #006ba6;
+ background-image: none;
+ -webkit-box-shadow: none;
+ -moz-box-shadow: none;
+ box-shadow: none;
+}
+
+hr {
+ border: none;
+ background-color: #eee;
+ height: 10px;
+}
+
+.menu {
+ padding: 10px;
+ margin-bottom: 10px;
+}
+
+.content {
+ background-color: #eee;
+ border: 1px solid #ccc;
+ padding: 10px;
+
+ -webkit-box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+ -moz-box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+ box-shadow: 2px 2px 10px 0px rgba(0,0,0,0.5);
+}
+
+.content .message {
+ padding: 20px;
+ margin-top: 10px;
+ background-color: #fff;
+ border: 1px solid #ccc;
+ font-size: 40px;
+ font-weight: bold;
+}
+
+.wrapper {
+ position: absolute;
+ height: 230px;
+ width: 640px;
+ left: 50%;
+ top: 50%;
+ -webkit-transform: translateX(-50%) translatey(-50%);
+}
+
+.error {
+ color: #a21e22;
+}
+
+.service {
+ background-color: #555;
+ color: #ccc;
+ font-size: 20px;
+ position: fixed;
+ bottom: 0px;
+ left: 0px;
+ right: 0px;
+ padding: 5px;
+}
\ No newline at end of file
diff --git a/tests/e2e/settings.py b/tests/e2e/settings.py
index 1113be2..1e90688 100644
--- a/tests/e2e/settings.py
+++ b/tests/e2e/settings.py
@@ -10,6 +10,8 @@
"django.contrib.contenttypes",
"django.contrib.auth",
"django.contrib.sessions",
+ "rest_framework",
+ "corsheaders",
"tests.e2e.test_app",
"django_pyoidc",
]
@@ -18,8 +20,10 @@
MIDDLEWARE = [
"django.contrib.sessions.middleware.SessionMiddleware",
+ "corsheaders.middleware.CorsMiddleware",
"django.middleware.common.CommonMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
+ "django.contrib.messages.middleware.MessageMiddleware",
]
TEMPLATES = [
@@ -50,22 +54,23 @@
"PORT": config("POSTGRES_PORT", default=5432),
}
}
-DJANGO_PYOIDC = {
- "test": {
- "OIDC_CALLBACK_PATH": "/callback",
- "OIDC_CLIENT_SECRET": "EnSAdFDlM78HejQ5EQATtlvXgRzfNww4",
- "OIDC_CLIENT_ID": "full",
- "OIDC_PROVIDER_DISCOVERY_URI": "http://oidc.test/auth/realms/Demo",
- "POST_LOGIN_URI_FAILURE": "http://oidc.test/",
- "POST_LOGOUT_REDIRECT_URI": "http://oidc.test/",
- "POST_LOGIN_URI_SUCCESS": "http://oidc.test/",
- "LOGIN_REDIRECTION_REQUIRES_HTTPS": False,
- "LOGIN_URIS_REDIRECT_ALLOWED_HOSTS": ["oidc.test"],
- "SCOPE": "full-dedicated",
- "CACHE_DJANGO_BACKEND": "default",
- }
+
+# DJANGO_PYOIDC settings are defined in tests overrides
+# we keep this one very short.
+DJANGO_PYOIDC = {}
+
+REST_FRAMEWORK = {
+ "DEFAULT_AUTHENTICATION_CLASSES": [
+ "django_pyoidc.drf.authentication.OIDCBearerAuthentication",
+ ],
+ # Use Django's standard `django.contrib.auth` permissions,
+ # or allow read-only access for unauthenticated users.
+ "DEFAULT_PERMISSION_CLASSES": ["rest_framework.permissions.DjangoModelPermissions"],
}
+CORS_ALLOWED_ORIGINS = [
+ "http://localhost:9999",
+]
ROOT_URLCONF = "tests.e2e.urls"
diff --git a/tests/e2e/test_app/__init__.py b/tests/e2e/test_app/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/tests/e2e/test_app/apps.py b/tests/e2e/test_app/apps.py
new file mode 100644
index 0000000..a0eb5dc
--- /dev/null
+++ b/tests/e2e/test_app/apps.py
@@ -0,0 +1,5 @@
+from django.apps import AppConfig
+
+
+class TestAppConfig(AppConfig):
+ name = "tests.e2e.test_app"
diff --git a/tests/e2e/test_app/models.py b/tests/e2e/test_app/models.py
new file mode 100644
index 0000000..58ed741
--- /dev/null
+++ b/tests/e2e/test_app/models.py
@@ -0,0 +1,8 @@
+from django.db import models
+
+
+class Public(models.Model):
+ data = models.TextField()
+
+ def __str__(self):
+ return self.data
diff --git a/tests/e2e/tests_keycloak.py b/tests/e2e/tests_keycloak.py
index 90c012f..2ff4dc9 100644
--- a/tests/e2e/tests_keycloak.py
+++ b/tests/e2e/tests_keycloak.py
@@ -1,5 +1,6 @@
import http.client as http_client
import logging
+import time
from urllib.parse import parse_qs, urlparse
import requests
@@ -10,12 +11,13 @@
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
-
-# from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support.ui import WebDriverWait
from tests.e2e.utils import OIDCE2EKeycloakTestCase
+logger = logging.getLogger(__name__)
+
+
# HTTP debug for requests
http_client.HTTPConnection.debuglevel = 1
@@ -29,7 +31,8 @@ def setUpClass(cls):
# options.headless = True
profile = FirefoxProfile()
profile.set_preference("browser.privatebrowsing.autostart", True)
- cls.selenium = WebDriver(firefox_profile=profile, options=options)
+ options.profile = profile
+ cls.selenium = WebDriver(options=options)
# cls.selenium.implicitly_wait(10)
@classmethod
@@ -37,7 +40,93 @@ def tearDownClass(cls):
cls.selenium.quit()
super().tearDownClass()
- def test_00_login_page_redirects_to_keycloak_sso(self, *args):
+ def test_001_m2m_client_credential_success(self, *args):
+ """
+ Check that we can request the API using a 'service account'.
+
+ We use a 'service account', so we are an external M2M client
+ in this situation, we do not use the Django OIDC credentials
+ but the creds of a service account, as would be done by an external
+ application in a B2B call.
+ """
+
+ sso_url = (
+ "http://localhost:8080/auth/realms/realm1/protocol/openid-connect/token"
+ )
+ params = {
+ "client_id": "app_m2m1",
+ "client_secret": "secret_app-m2m1",
+ "grant_type": "client_credentials",
+ }
+ print("sending M2M Login Request")
+ response = requests.post(sso_url, data=params)
+ # failing the test in case of bad HTTP status
+ response.raise_for_status()
+
+ print("Auth success")
+ data = response.json()
+ access_token = data["access_token"]
+
+ api_url = f"{self.live_server_url}/api/users"
+ headers = {
+ "Authorization": f"Bearer {access_token}",
+ }
+ print("sending API Request with access token")
+ response = requests.get(api_url, headers=headers)
+ response.raise_for_status()
+ content = response.text
+ self.assertIn('"username":"service-account-app_m2m1"', content)
+
+ def test_002_m2m_client_credential_forbidden(self, *args):
+ """
+ Check that we can request the API using a 'service account' and be rejected.
+
+ The client used here does not have access to the same scope as the client
+ used in the API. THe group and roles are OK, but not the scope. So Keycloak
+ will not add the right audience in the token and we should be rejected.
+ """
+
+ sso_url = (
+ "http://localhost:8080/auth/realms/realm1/protocol/openid-connect/token"
+ )
+ params = {
+ "client_id": "app2_m2m2",
+ "client_secret": "secret_app2-m2m2",
+ "grant_type": "client_credentials",
+ }
+ print("sending M2M Login Request")
+ response = requests.post(sso_url, data=params)
+ # failing the test in case of bad HTTP status
+ response.raise_for_status()
+
+ print("Auth success")
+ data = response.json()
+ access_token = data["access_token"]
+
+ api_url = f"{self.live_server_url}/api/users"
+ headers = {
+ "Authorization": f"Bearer {access_token}",
+ }
+ print("sending API Request with access token, should get a 403 Forbidden.")
+ response = requests.get(api_url, headers=headers)
+ self.assertEqual(403, response.status_code)
+
+ def test_003_m2m_anonymous_api_access(self, *args):
+
+ # auth part is forbidden
+ api_url = f"{self.live_server_url}/api/users"
+ response = requests.get(api_url)
+ self.assertEqual(403, response.status_code)
+
+ # public part is OK
+ api_url = f"{self.live_server_url}/api/publics"
+ print("sending anonymous API Request.")
+ response = requests.get(api_url)
+ response.raise_for_status()
+ content = response.text
+ self.assertIn("[]", content)
+
+ def test_100_login_page_redirects_to_keycloak_sso(self, *args):
"""
Test that accessing login callback redirects to the SSO server.
"""
@@ -78,6 +167,31 @@ def test_00_login_page_redirects_to_keycloak_sso(self, *args):
self.assertTrue(qs["state"][0])
self.assertTrue(qs["nonce"][0])
+ def _selenium_front_sso_login(self, user, password):
+ front_url = "http://localhost:9999"
+ self.selenium.get(front_url)
+ WebDriverWait(self.selenium, 30).until(
+ EC.element_to_be_clickable((By.ID, "loginBtn"))
+ ).click()
+ # self.selenium.find_element(By.ID, "loginBtn").click()
+ self.wait.until(EC.url_changes(front_url))
+ username_input = self.selenium.find_element(By.NAME, "username")
+ username_input.send_keys(user)
+ password_input = self.selenium.find_element(By.NAME, "password")
+ password_input.send_keys(password)
+ self.selenium.find_element(By.ID, "kc-login").click()
+ self.wait.until(EC.url_matches(front_url))
+
+ def _selenium_front_logout(self):
+ front_url = "http://localhost:9999"
+ WebDriverWait(self.selenium, 30).until(
+ EC.element_to_be_clickable((By.ID, "logoutBtn"))
+ ).click()
+ self.wait.until(EC.url_matches(front_url))
+ bodyText = self.selenium.find_element(By.ID, "message").text
+ # check we are NOT logged in
+ self.assertEqual("", bodyText)
+
def _selenium_sso_login(
self,
login_start_url,
@@ -129,7 +243,7 @@ def _selenium_logout(self, end_url):
# check we are NOT logged in
self.assertTrue("You are logged out" in bodyText)
- def test_01_selenium_sso_login(self, *args):
+ def test_101_selenium_sso_login(self, *args):
"""
Test a complete working OIDC login/logout.
"""
@@ -174,7 +288,7 @@ def test_01_selenium_sso_login(self, *args):
self.assertTrue("OIDC-LOGIN-LINK" in bodyText)
self.assertFalse("OIDC-LOGOUT-LINK" in bodyText)
- def test_02_selenium_sso_login__relogin_and_logout(self, *args):
+ def test_102_selenium_sso_login__relogin_and_logout(self, *args):
"""
Test a login/logout session, adding a re-login on existing session in the middle
"""
@@ -246,7 +360,7 @@ def test_02_selenium_sso_login__relogin_and_logout(self, *args):
},
},
)
- def test_03_selenium_sso_session_with_callbacks(self, *args):
+ def test_103_selenium_sso_session_with_callbacks(self, *args):
timeout = 5
login_url = reverse("test_login")
success_url = reverse("test_success")
@@ -300,7 +414,7 @@ def test_03_selenium_sso_session_with_callbacks(self, *args):
},
},
)
- def test_04_selenium_sso_failed_login(self, *args):
+ def test_104_selenium_sso_failed_login(self, *args):
"""
Test a failed SSO login (bad client)
"""
@@ -341,7 +455,7 @@ def test_04_selenium_sso_failed_login(self, *args):
},
},
)
- def test_05_selenium_ressource_access_checks(self, *args):
+ def test_105_selenium_ressource_access_checks(self, *args):
"""
Check that a resource access check can be performed to prevent access for some users.
@@ -439,7 +553,7 @@ def test_05_selenium_ressource_access_checks(self, *args):
},
},
)
- def test_06_selenium_minimal_audience_checks(self, *args):
+ def test_106_selenium_minimal_audience_checks(self, *args):
"""
Check that a minimal audience check can be performed to prevent access for some users.
@@ -524,3 +638,41 @@ def test_06_selenium_minimal_audience_checks(self, *args):
# Check the session message is shown
self.assertTrue("message: user_app1_only@example.com is logged in." in bodyText)
self._selenium_logout(end_url)
+
+ def test_201_selenium_front_app_api_call(self, *args):
+ """
+ Check that a test front app can make an OIDC API call.
+ """
+ timeout = 60
+ self.wait = WebDriverWait(self.selenium, timeout)
+
+ # user1 login is OK, resource access success
+ self._selenium_front_sso_login("user1", "passwd1")
+ # let the page reloads after login fill the user session stuff
+ time.sleep(3)
+ bodyText = self.selenium.find_element(By.ID, "message").get_attribute(
+ "innerHTML"
+ )
+ self.assertTrue("User: user1" in bodyText)
+
+ WebDriverWait(self.selenium, 30).until(
+ EC.element_to_be_clickable((By.ID, "securedBtn"))
+ ).click()
+
+ # let the ajax stuff behave
+ time.sleep(3)
+ bodyText = self.selenium.find_element(By.ID, "message").get_attribute(
+ "innerHTML"
+ )
+ # logger.error(bodyText)
+ self.assertTrue("user1@example.com" in bodyText)
+
+ # FIXME: there a selenium issue in the logout btn selection...
+ # self._selenium_front_logout()
+ #
+ # # After logout, launch unauthorized ajax call
+ # WebDriverWait(self.selenium, 30).until(EC.element_to_be_clickable((By.ID, "securedBtn"))).click()
+ # # let the ajax stuff behave
+ # time.sleep(3)
+ # bodyText = self.selenium.find_element(By.ID, "message").get_attribute('innerHTML')
+ # self.assertTrue("Request Forbidden" in bodyText)
diff --git a/tests/e2e/urls.py b/tests/e2e/urls.py
index 72e3f47..6f385de 100644
--- a/tests/e2e/urls.py
+++ b/tests/e2e/urls.py
@@ -1,4 +1,7 @@
-from django.urls import path
+from django.contrib.auth.models import User
+from django.urls import include, path
+from rest_framework import routers, serializers, viewsets
+from rest_framework.permissions import AllowAny
from django_pyoidc.views import (
OIDCBackChannelLogoutView,
@@ -6,12 +9,46 @@
OIDCLoginView,
OIDCLogoutView,
)
+from tests.e2e.test_app.models import Public
from tests.e2e.test_app.views import (
OIDCTestFailureView,
OIDCTestLogoutView,
OIDCTestSuccessView,
)
+
+# DRF tests ----
+# Serializers define the API representation.
+class UserSerializer(serializers.HyperlinkedModelSerializer):
+ class Meta:
+ model = User
+ fields = ["url", "username", "email", "is_staff"]
+
+
+class PublicSerializer(serializers.HyperlinkedModelSerializer):
+ class Meta:
+ model = Public
+ fields = ["data"]
+
+
+# ViewSets define the view behavior.
+class UserViewSet(viewsets.ModelViewSet):
+ queryset = User.objects.all()
+ serializer_class = UserSerializer
+
+
+# ViewSets define the view behavior.
+class PublicViewSet(viewsets.ModelViewSet):
+ queryset = Public.objects.all()
+ permission_classes = [AllowAny]
+ serializer_class = PublicSerializer
+
+
+# Routers provide an easy way of automatically determining the URL conf.
+apirouter = routers.DefaultRouter()
+apirouter.register(r"users", UserViewSet)
+apirouter.register(r"publics", PublicViewSet)
+
urlpatterns = [
path("login/", OIDCLoginView.as_view(op_name="sso1"), name="test_login"),
path(
@@ -44,4 +81,5 @@
OIDCTestFailureView.as_view(op_name="sso1"),
name="test_failure",
),
+ path("api/", include(apirouter.urls)),
]
diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py
index ec129f5..83e8af0 100644
--- a/tests/e2e/utils.py
+++ b/tests/e2e/utils.py
@@ -210,11 +210,14 @@ def loadLemonLDAPFixtures(cls):
print(" + Create client applications.")
cls.registerClient("app1", "secret_app1", cls.live_server_url)
cls.registerClient(
- "app1-api", "secret_app1-api", cls.live_server_url, bearerOnly=True
+ "app1-api",
+ "secret_app1-api",
+ cls.live_server_url,
+ bearerOnly=True,
)
- cls.registerClient("app2-foo", "secret_app2-foo", cls.live_server_url)
+ cls.registerClient("app2-full", "secret_app2-full", cls.live_server_url)
cls.registerClient(
- "app2-bar", "secret_app2-bar", cls.live_server_url, bearerOnly=True
+ "app2-api", "secret_app2-api", cls.live_server_url, bearerOnly=True
)
# Default demo users:
# rtyler :: rtyler
@@ -316,6 +319,7 @@ def registerClient(cls, name, secret, url, bearerOnly=False):
STATIC_URL="/static",
MIDDLEWARE=[
"django.contrib.sessions.middleware.SessionMiddleware",
+ "corsheaders.middleware.CorsMiddleware",
"django.middleware.common.CommonMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
@@ -347,6 +351,13 @@ def registerClient(cls, name, secret, url, bearerOnly=False):
"LOGIN_URIS_REDIRECT_ALLOWED_HOSTS": ["testserver"],
"LOGIN_REDIRECTION_REQUIRES_HTTPS": False,
},
+ "apisso": {
+ "OIDC_CLIENT_ID": "app1-api",
+ "CACHE_DJANGO_BACKEND": "default",
+ "OIDC_PROVIDER_DISCOVERY_URI": "http://localhost:8080/auth/realms/realm1",
+ "OIDC_CLIENT_SECRET": "secret_app1-api",
+ "USED_BY_REST_FRAMEWORK": True,
+ },
},
)
class OIDCE2EKeycloakTestCase(OIDCE2ETestCase):
@@ -386,6 +397,31 @@ def setUpClass(cls):
)
cls.docker_id = res.stdout.partition("\n")[0]
print(cls.docker_id)
+
+ cls.docker_front_workdir = f"{cls.workdir}/tests/e2e/front_test_app"
+ os.chdir(cls.docker_front_workdir)
+ print("Building front docker image...")
+ subprocess.run(
+ [
+ "docker",
+ "build",
+ "-f",
+ "Dockerfile",
+ "-t",
+ "oidc-test-front-image",
+ ".",
+ ]
+ )
+ command = (
+ "docker run"
+ " --detach --rm -it"
+ " -p 9999:9999"
+ f" -e KEYCLOAK_URL=http://localhost:8080/auth -e BACKEND_URL={cls.live_server_url}"
+ " oidc-test-front-image"
+ )
+ subprocess.run(
+ command, shell=True, text=True, check=True, capture_output=True
+ )
except subprocess.CalledProcessError as e:
print(
f"Error while launching keycloak docker container. errcode: {e.returncode}."
@@ -398,11 +434,14 @@ def setUpClass(cls):
print(" +----------\ /--------------------------+ ") # noqa
print(" +-----------\/---------------------------+ ") # noqa
print(
- " + Try removing any previous Keycloak image running using this command:"
+ " + Try removing any previous Keycloak and front images running using these commands:"
)
print(
' docker stop $(docker ps -a -q --filter ancestor=oidc-test-keycloak-image --format="{{.ID}}")'
)
+ print(
+ ' docker stop $(docker ps -a -q --filter ancestor=oidc-test-front-image --format="{{.ID}}")'
+ )
print(" + Check also you have no service running on localhost:8080.")
print(" +---------------------------------------+ ")
print(" +---------------------------------------+ ")
@@ -448,26 +487,55 @@ def loadKeycloakFixtures(cls):
)
print(" + Create client applications.")
- app1_id = cls.registerClient("app1", "secret_app1", cls.live_server_url)
+ app1_id = cls.registerClient(
+ "app1",
+ "secret_app1",
+ cls.live_server_url,
+ serviceAccount=False,
+ channelLogoutUrl=f"{cls.live_server_url}/oidc/backchannel-logout",
+ )
app1_api_id = cls.registerClient(
- "app1-api", "secret_app1-api", cls.live_server_url, bearerOnly=True
+ "app1-api",
+ "secret_app1-api",
+ cls.live_server_url,
+ bearerOnly=True,
+ serviceAccount=False,
)
app1_front_id = cls.registerClient(
- "app1-front", None, cls.live_server_url, bearerOnly=False
+ "app1-front",
+ None,
+ "http://localhost:9999",
+ bearerOnly=False,
+ serviceAccount=False,
+ channelLogoutUrl="http://localhost:9999",
)
- app2_foo_id = cls.registerClient(
- "app2-foo", "secret_app2-foo", cls.live_server_url
+ app_m2m1_id = cls.registerClient(
+ "app_m2m1",
+ "secret_app-m2m1",
+ cls.live_server_url,
+ bearerOnly=False,
+ serviceAccount=True,
)
- app2_bar_id = cls.registerClient(
- "app2-bar", "secret_app2-bar", cls.live_server_url, bearerOnly=True
+ app2_m2m2_id = cls.registerClient(
+ "app2_m2m2",
+ "secret_app2-m2m2",
+ cls.live_server_url,
+ bearerOnly=False,
+ serviceAccount=True,
+ )
+ app2_full_id = cls.registerClient(
+ "app2-full", "secret_app2-full", cls.live_server_url, bearerOnly=False
+ )
+ app2_api_id = cls.registerClient(
+ "app2-api", "secret_app2-api", cls.live_server_url, bearerOnly=True
)
print(" + Create client applications access roles.")
app1_role = cls.registerClientRole(app1_id, "AccessApp1")
app1_bis_role = cls.registerClientRole(app1_api_id, "AccessApp1API")
app1_front_role = cls.registerClientRole(app1_front_id, "AccessApp1Front")
- app2_foo_role = cls.registerClientRole(app2_foo_id, "AccessApp2Foo")
- app2_bar_role = cls.registerClientRole(app2_bar_id, "AccessApp2Bar")
+ app2_full_role = cls.registerClientRole(app2_full_id, "AccessApp2Full")
+ app2_api_role = cls.registerClientRole(app2_api_id, "AccessApp2API")
print(" + Create Client Scopes.")
id_zone_app1 = cls.registerClientScope(
@@ -481,8 +549,8 @@ def loadKeycloakFixtures(cls):
id_zone_app2 = cls.registerClientScope(
"zone-app2",
[
- {app2_foo_id: app2_foo_role},
- {app2_bar_id: app2_bar_role},
+ {app2_full_id: app2_full_role},
+ {app2_api_id: app2_api_role},
],
)
@@ -490,8 +558,10 @@ def loadKeycloakFixtures(cls):
cls.addClientScopeForClient(app1_id, id_zone_app1)
cls.addClientScopeForClient(app1_api_id, id_zone_app1)
cls.addClientScopeForClient(app1_front_id, id_zone_app1)
- cls.addClientScopeForClient(app2_foo_id, id_zone_app2)
- cls.addClientScopeForClient(app2_bar_id, id_zone_app2)
+ cls.addClientScopeForClient(app_m2m1_id, id_zone_app1)
+ cls.addClientScopeForClient(app2_full_id, id_zone_app2)
+ cls.addClientScopeForClient(app2_api_id, id_zone_app2)
+ cls.addClientScopeForClient(app2_m2m2_id, id_zone_app2)
print(" + Create Groups.")
gApp1 = cls.registerGroup(
@@ -505,8 +575,8 @@ def loadKeycloakFixtures(cls):
gApp2 = cls.registerGroup(
"App2",
[
- {"app2-foo": "AccessApp2Foo"},
- {"app2-bar": "AccessApp2Bar"},
+ {"app2-full": "AccessApp2Full"},
+ {"app2-api": "AccessApp2API"},
],
)
gApp1Restricted = cls.registerGroup(
@@ -515,16 +585,30 @@ def loadKeycloakFixtures(cls):
{"app1": "AccessApp1"},
],
)
+ gm2m = cls.registerGroup(
+ "GroupM2M",
+ [
+ {"app1": "AccessApp1"},
+ {"app1-api": "AccessApp1API"},
+ {"app2-full": "AccessApp2Full"},
+ {"app2-api": "AccessApp2API"},
+ ],
+ )
gAppAll = cls.registerGroup(
"AllApps",
[
{"app1": "AccessApp1"},
{"app1-api": "AccessApp1API"},
{"app1-front": "AccessApp1Front"},
- {"app2-foo": "AccessApp2Foo"},
- {"app2-bar": "AccessApp2Bar"},
+ {"app2-full": "AccessApp2Full"},
+ {"app2-api": "AccessApp2API"},
],
)
+ print(" + Link service account users to groups")
+ m2m_user1 = cls.searchUser("service-account-app_m2m1")
+ m2m_user2 = cls.searchUser("service-account-app2_m2m2")
+ cls.add_user_to_group(m2m_user1, gm2m)
+ cls.add_user_to_group(m2m_user2, gm2m)
print(" + Create users.")
cls.registerUser(
@@ -584,7 +668,20 @@ def tearDownClass(cls):
print(f"Error stopping keycloak container: {e.returncode}.")
print(e.stderr)
finally:
- os.chdir(cls.workdir)
+ os.chdir(cls.docker_front_workdir)
+ print("Removing front docker image...")
+ try:
+ cmd = (
+ "docker stop $("
+ 'docker ps -a -q --filter ancestor=oidc-test-front-image --format="{{.ID}}"'
+ ")"
+ )
+ subprocess.run(cmd, shell=True, text=True, check=True)
+ except subprocess.CalledProcessError as e:
+ print(f"Error stopping front container: {e.returncode}.")
+ print(e.stderr)
+ finally:
+ os.chdir(cls.workdir)
@classmethod
def docker_keycloak_command(cls, command: str):
@@ -626,17 +723,73 @@ def docker_keycloak_command(cls, command: str):
return output
@classmethod
- def registerClient(cls, name, secret, url, bearerOnly=False):
- redirectUris = "[ ]" if bearerOnly else f'[ "{url}/*" ]'
+ def add_user_to_group(cls, user_id, group):
+ cls.docker_keycloak_command(
+ f"bin/kcadm.sh update users/{user_id}/groups/{group} -r realm1"
+ )
+
+ @classmethod
+ def searchUser(cls, user_name):
+ output = cls.docker_keycloak_command(
+ "bin/kcadm.sh get users -r realm1"
+ f" -q username={user_name} --fields 'id,username'"
+ )
+ id = None
+ cpt = 0
+ prevline = ""
+ for line in output.stdout.splitlines():
+ if line == f' "username" : "{user_name}"':
+ cpt = cpt - 1
+ id_line = prevline
+ # line looks like ' "id": "42562-..-45646",'
+ # we split on " and get 3rd value
+ id = id_line.split('"')[3]
+ break
+ cpt += 1
+ prevline = line
+ return id
+
+ @classmethod
+ def registerClient(
+ cls,
+ name,
+ secret,
+ url,
+ bearerOnly=False,
+ serviceAccount=False,
+ channelLogoutUrl="",
+ ):
+ if serviceAccount:
+ bServiceAccountEnabled = "true"
+ redirectUris = "[ ]"
+ extraAttributes = (
+ '"use.refresh.tokens": "true",'
+ '"client_credentials.use_refresh_token": "false",'
+ '"tls.client.certificate.bound.access.tokens": "false",'
+ '"require.pushed.authorization.requests": "false",'
+ '"acr.loa.map": "{}",'
+ '"token.response.type.bearer.lower-case": "false",'
+ )
+ backchannelLogoutUrl = ""
+ frontchannelLogoutUrl = ""
+ else:
+
+ bServiceAccountEnabled = "false"
+ redirectUris = "[ ]" if bearerOnly else f'[ "{url}/*" ]'
+ extraAttributes = ""
bBearerOnly = "true" if bearerOnly else "false"
bStandardFlowEnabled = "false" if bearerOnly else "true"
if secret is None:
public_line = '"publicClient" : true,'
secret_line = ""
frontch_line = '"frontchannelLogout" : true,'
+ backchannelLogoutUrl = ""
+ frontchannelLogoutUrl = channelLogoutUrl
else:
public_line = '"publicClient" : false,'
frontch_line = '"frontchannelLogout" : false,'
+ frontchannelLogoutUrl = ""
+ backchannelLogoutUrl = channelLogoutUrl
secret_line = (
f'"clientAuthenticatorType" : "client-secret", "secret" : "{secret}",'
)
@@ -661,17 +814,20 @@ def registerClient(cls, name, secret, url, bearerOnly=False):
"standardFlowEnabled" : {bStandardFlowEnabled},
"implicitFlowEnabled" : false,
"directAccessGrantsEnabled" : false,
- "serviceAccountsEnabled" : false,
+ "serviceAccountsEnabled" : {bServiceAccountEnabled},
{public_line}
{frontch_line}
"protocol" : "openid-connect",
"attributes" : {{
+ {extraAttributes}
"oidc.ciba.grant.enabled" : "false",
- "backchannel.logout.session.required" : "false",
"post.logout.redirect.uris" : "+",
"display.on.consent.screen" : "false",
"oauth2.device.authorization.grant.enabled" : "false",
- "backchannel.logout.revoke.offline.tokens" : "false"
+ "backchannel.logout.revoke.offline.tokens" : "false",
+ "backchannel.logout.session.required" : "false",
+ "backchannel.logout.url": "{backchannelLogoutUrl}",
+ "frontchannel.logout.url": "{frontchannelLogoutUrl}"
}},
"authenticationFlowBindingOverrides" : {{ }},
"fullScopeAllowed" : false,
@@ -741,7 +897,7 @@ def registerClientScope(cls, client_scope, role_mappings=[]):
@classmethod
def registerGroup(cls, name, role_mappings=[]):
output = cls.docker_keycloak_command(
- "bin/kcadm.sh create groups" " -r realm1" f" -s name={name}"
+ "bin/kcadm.sh create groups -r realm1" f" -s name={name}"
)
# output was in the form
# "Created new group with id '0f5d0645-a7a7-4e88-adf8-b9e568025f5c'""
diff --git a/tests/tests_cache.py b/tests/tests_cache.py
new file mode 100644
index 0000000..17bcddd
--- /dev/null
+++ b/tests/tests_cache.py
@@ -0,0 +1,70 @@
+from unittest import mock
+from unittest.mock import call
+
+from django_pyoidc.client import OIDCClient
+from django_pyoidc.utils import OIDCCacheBackendForDjango
+from tests.utils import OIDCTestCase
+
+
+class CacheTestCase(OIDCTestCase):
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
+ def test_providers_info_not_cached(self, mocked_provider_config):
+ """
+ Test that multiple Clients creation without cache means several provider_info calls
+ """
+ # OIDCClient creation generates one call to Consumer.provider_config
+ sso1 = OIDCClient(op_name="sso1")
+ mocked_provider_config.assert_has_calls([call("")])
+ assert 1 == mocked_provider_config.call_count
+
+ # restoring a user session does not create a new call
+ sso1.consumer._backup(sid="1234")
+ OIDCClient(op_name="sso1", session_id="1234")
+
+ mocked_provider_config.assert_has_calls([call("")])
+ assert 1 == mocked_provider_config.call_count
+
+ # but a new empty Client would add a new metadata call
+ OIDCClient(op_name="sso1")
+ mocked_provider_config.assert_has_calls([call(""), call("")])
+ assert 2 == mocked_provider_config.call_count
+
+ @mock.patch(
+ "django_pyoidc.client.Consumer.provider_config",
+ return_value=('[{"foo": "bar"}]'),
+ )
+ def test_providers_info_cached(self, mocked_provider_config):
+ """
+ Test that multiple Clients creation with cache means only one provider_info call
+ """
+
+ # empty the caches
+ cache1 = OIDCCacheBackendForDjango("sso3")
+ cache1.clear()
+ cache2 = OIDCCacheBackendForDjango("sso4")
+ cache2.clear()
+
+ # OIDCClient creation generates one call to Consumer.provider_config
+ sso1 = OIDCClient(op_name="sso3")
+ mocked_provider_config.assert_has_calls([call("http://sso3/uri")])
+ assert 1 == mocked_provider_config.call_count
+
+ # restoring a user session does not crteate a new call
+ sso1.consumer._backup(sid="1234")
+ OIDCClient(op_name="sso3", session_id="1234")
+ mocked_provider_config.assert_has_calls([call("http://sso3/uri")])
+ assert 1 == mocked_provider_config.call_count
+
+ # creating a new OIDCClient should activate shared cache
+ # and prevent a new call
+ OIDCClient(op_name="sso3")
+ mocked_provider_config.assert_has_calls([call("http://sso3/uri")])
+ assert 1 == mocked_provider_config.call_count
+
+ # BUT adding a new Client with a different op_name will add a call,
+ # as it is not the same cache key
+ OIDCClient(op_name="sso4")
+ mocked_provider_config.assert_has_calls(
+ [call("http://sso3/uri"), call("http://sso4/uri")]
+ )
+ assert 2 == mocked_provider_config.call_count
diff --git a/tests/tests_session.py b/tests/tests_session.py
index 017cd67..d23d913 100644
--- a/tests/tests_session.py
+++ b/tests/tests_session.py
@@ -1,19 +1,19 @@
from unittest import mock
from unittest.mock import call
+from django_pyoidc.client import OIDCClient
from django_pyoidc.utils import OIDCCacheBackendForDjango
-from django_pyoidc.views import OIDClient
from tests.utils import OIDCTestCase
class SessionTestCase(OIDCTestCase):
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
def test_session_isolation_between_providers(self, mocked_provider_config):
"""
Test that different SSO providers using same SID do not conflict
"""
- sso1 = OIDClient(op_name="sso1")
- sso2 = OIDClient(op_name="sso2")
+ sso1 = OIDCClient(op_name="sso1")
+ sso2 = OIDCClient(op_name="sso2")
mocked_provider_config.assert_has_calls([call(""), call("")])
assert 2 == mocked_provider_config.call_count
@@ -25,10 +25,10 @@ def test_session_isolation_between_providers(self, mocked_provider_config):
mocked_provider_config.assert_has_calls([call(""), call("")])
assert 2 == mocked_provider_config.call_count
- client1 = OIDClient(op_name="sso1", session_id="1234")
+ client1 = OIDCClient(op_name="sso1", session_id="1234")
self.assertEqual(client1.consumer.client_id, "1")
- client2 = OIDClient(op_name="sso2", session_id="1234")
+ client2 = OIDCClient(op_name="sso2", session_id="1234")
# no more calls
mocked_provider_config.assert_has_calls([call(""), call("")])
@@ -37,7 +37,7 @@ def test_session_isolation_between_providers(self, mocked_provider_config):
self.assertEqual(client2.consumer.client_id, "2")
@mock.patch(
- "django_pyoidc.views.Consumer.provider_config",
+ "django_pyoidc.client.Consumer.provider_config",
return_value=('[{"foo": "bar"}]'),
)
def test_session_isolation_between_providers_cached(self, mocked_provider_config):
@@ -51,11 +51,11 @@ def test_session_isolation_between_providers_cached(self, mocked_provider_config
cache1.clear()
cache2.clear()
- sso1 = OIDClient(op_name="sso3")
+ sso1 = OIDCClient(op_name="sso3")
mocked_provider_config.assert_has_calls([call("http://sso3/uri")])
assert 1 == mocked_provider_config.call_count
- sso2 = OIDClient(op_name="sso4")
+ sso2 = OIDCClient(op_name="sso4")
mocked_provider_config.assert_has_calls(
[call("http://sso3/uri"), call("http://sso4/uri")]
)
@@ -64,10 +64,10 @@ def test_session_isolation_between_providers_cached(self, mocked_provider_config
sso1.consumer._backup(sid="1234")
sso2.consumer._backup(sid="1234")
- client1 = OIDClient(op_name="sso3", session_id="1234")
+ client1 = OIDCClient(op_name="sso3", session_id="1234")
self.assertEqual(client1.consumer.client_id, "3")
- client2 = OIDClient(op_name="sso4", session_id="1234")
+ client2 = OIDCClient(op_name="sso4", session_id="1234")
self.assertEqual(client2.consumer.client_id, "4")
mocked_provider_config.assert_has_calls(
diff --git a/tests/tests_views.py b/tests/tests_views.py
index 4e68b80..a93c9c6 100644
--- a/tests/tests_views.py
+++ b/tests/tests_views.py
@@ -9,17 +9,17 @@
from oic.oic import IdToken
from oic.oic.message import OpenIDSchema
+from django_pyoidc.client import OIDCClient
from django_pyoidc.models import OIDCSession
-from django_pyoidc.views import OIDClient
from tests.utils import OIDCTestCase
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
class LoginViewTestCase(OIDCTestCase):
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
@mock.patch(
- "django_pyoidc.views.Consumer.begin",
+ "django_pyoidc.client.Consumer.begin",
return_value=(1234, "https://sso.notatld"),
)
def test_redirect_uri_management_no_next_params(self, *args):
@@ -38,9 +38,9 @@ def test_redirect_uri_management_no_next_params(self, *args):
settings.DJANGO_PYOIDC["sso1"]["POST_LOGIN_URI_SUCCESS"],
)
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
@mock.patch(
- "django_pyoidc.views.Consumer.begin",
+ "django_pyoidc.client.Consumer.begin",
return_value=(1234, "https://sso.notatld"),
)
def test_redirect_uri_management_next_to_samesite(self, *args):
@@ -64,9 +64,9 @@ def test_redirect_uri_management_next_to_samesite(self, *args):
"https://test.django-pyoidc.notatld/myview/details",
)
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
@mock.patch(
- "django_pyoidc.views.Consumer.begin",
+ "django_pyoidc.client.Consumer.begin",
return_value=(1234, "https://sso.notatld"),
)
def test_redirect_uri_management_next_follows_https_requires(self, *args):
@@ -90,9 +90,9 @@ def test_redirect_uri_management_next_follows_https_requires(self, *args):
settings.DJANGO_PYOIDC["sso1"]["POST_LOGIN_URI_SUCCESS"],
)
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
@mock.patch(
- "django_pyoidc.views.Consumer.begin",
+ "django_pyoidc.client.Consumer.begin",
return_value=(1234, "https://sso.notatld"),
)
def test_redirect_uri_management_next_to_disallowed_site(self, *args):
@@ -106,7 +106,7 @@ def test_redirect_uri_management_next_to_disallowed_site(self, *args):
)
self.assertEqual(response.status_code, 400)
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
def test_oidc_session_is_saved(self, *args):
"""
Test that the OIDC client is saved on login request, and that the returned session ID allows us to restore the client
@@ -123,7 +123,7 @@ def test_oidc_session_is_saved(self, *args):
self.assertEqual(response.status_code, 302)
sid = self.client.session["oidc_sid"]
self.assertIsNotNone(sid)
- client = OIDClient(op_name="sso1", session_id=sid)
+ client = OIDCClient(op_name="sso1", session_id=sid)
self.assertEqual(client.consumer.client_id, "1")
@@ -156,9 +156,9 @@ def test_django_user_is_at_least_logged_out(self):
SESSION_KEY in self.client.session
) # from https://stackoverflow.com/a/6013115
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
@mock.patch(
- "django_pyoidc.views.Consumer.request_info",
+ "django_pyoidc.client.Consumer.request_info",
return_value=("http://example.com", "", "", ""),
)
def test_logout_generates_oidc_request_to_sso(
@@ -203,10 +203,10 @@ def test_callback_but_no_sid_on_our_side(self):
self.assertRedirects(response, "/logout_failure", fetch_redirect_response=False)
@mock.patch(
- "django_pyoidc.views.Consumer.parse_authz",
+ "django_pyoidc.client.Consumer.parse_authz",
return_value=({"state": ""}, None, None),
)
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_callback_but_state_mismatch(self, mocked_restore, mocked_parse_authz):
"""
Test that receiving a callback with a wrong state parameter results in an HTTP 4XX error
@@ -225,18 +225,18 @@ def test_callback_but_state_mismatch(self, mocked_restore, mocked_parse_authz):
mocked_parse_authz.assert_called_once()
@mock.patch(
- "django_pyoidc.views.Consumer.parse_authz",
+ "django_pyoidc.client.Consumer.parse_authz",
return_value=({"state": "test_id_12345"}, None, None),
)
- @mock.patch("django_pyoidc.views.get_user_by_email", return_value=None)
+ @mock.patch("django_pyoidc.engine.get_user_by_email", return_value=None)
@mock.patch(
- "django_pyoidc.views.Consumer.get_user_info", return_value=OpenIDSchema()
+ "django_pyoidc.client.Consumer.get_user_info", return_value=OpenIDSchema()
)
@mock.patch(
- "django_pyoidc.views.Consumer.complete",
+ "django_pyoidc.client.Consumer.complete",
return_value={"id_token": IdToken(iss="fake")},
)
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_callback_no_session_state_provided_invalid_user(
self,
mocked_restore,
@@ -262,23 +262,30 @@ def test_callback_no_session_state_provided_invalid_user(
mocked_complete.assert_called_once_with(state=state, session_state=None)
mocked_parse_authz.assert_called_once()
mocked_get_user_info.assert_called_once_with(state=state)
- mocked_get_user.assert_called_once_with({}, {"iss": "fake"})
+ mocked_get_user.assert_called_once_with(
+ {
+ "info_token_claims": {},
+ "access_token_jwt": None,
+ "access_token_claims": None,
+ "id_token_claims": {"iss": "fake"},
+ }
+ )
self.assertRedirects(response, "/logout_failure", fetch_redirect_response=False)
self.assertEqual(OIDCSession.objects.all().count(), 0)
@mock.patch("django_pyoidc.views.OIDCView.call_callback_function")
@mock.patch(
- "django_pyoidc.views.Consumer.parse_authz",
+ "django_pyoidc.client.Consumer.parse_authz",
return_value=({"state": "test_id_12345"}, None, None),
)
- @mock.patch("django_pyoidc.views.get_user_by_email")
- @mock.patch("django_pyoidc.views.Consumer.get_user_info")
+ @mock.patch("django_pyoidc.engine.get_user_by_email")
+ @mock.patch("django_pyoidc.client.Consumer.get_user_info")
@mock.patch(
- "django_pyoidc.views.Consumer.complete",
+ "django_pyoidc.client.Consumer.complete",
return_value={"id_token": IdToken(iss="fake")},
)
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_callback_no_session_state_provided_valid_user(
self,
mocked_restore,
@@ -315,7 +322,14 @@ def test_callback_no_session_state_provided_valid_user(
mocked_parse_authz.assert_called_once()
mocked_get_user_info.assert_called_once_with(state=state)
- mocked_get_user.assert_called_once_with(user_info_dict, {"iss": "fake"})
+ mocked_get_user.assert_called_once_with(
+ {
+ "info_token_claims": user_info_dict,
+ "access_token_jwt": None,
+ "access_token_claims": None,
+ "id_token_claims": {"iss": "fake"},
+ }
+ )
self.assertRedirects(
response, "/default/success", fetch_redirect_response=False
@@ -332,14 +346,14 @@ def test_callback_no_session_state_provided_valid_user(
mocked_call_callback_function.assert_called_once()
@mock.patch("django_pyoidc.views.OIDCView.call_callback_function")
- @mock.patch("django_pyoidc.views.Consumer.parse_authz")
- @mock.patch("django_pyoidc.views.get_user_by_email")
- @mock.patch("django_pyoidc.views.Consumer.get_user_info")
+ @mock.patch("django_pyoidc.client.Consumer.parse_authz")
+ @mock.patch("django_pyoidc.engine.get_user_by_email")
+ @mock.patch("django_pyoidc.client.Consumer.get_user_info")
@mock.patch(
- "django_pyoidc.views.Consumer.complete",
+ "django_pyoidc.client.Consumer.complete",
return_value={"id_token": IdToken(iss="fake")},
)
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_callback_with_session_state_provided_valid_user(
self,
mocked_restore,
@@ -382,7 +396,14 @@ def test_callback_with_session_state_provided_valid_user(
mocked_parse_authz.assert_called_once()
mocked_get_user_info.assert_called_once_with(state=state)
- mocked_get_user.assert_called_once_with(user_info_dict, {"iss": "fake"})
+ mocked_get_user.assert_called_once_with(
+ {
+ "info_token_claims": user_info_dict,
+ "access_token_jwt": None,
+ "access_token_claims": None,
+ "id_token_claims": {"iss": "fake"},
+ }
+ )
self.assertRedirects(
response, "/default/success", fetch_redirect_response=False
@@ -466,8 +487,8 @@ def test_invalid_backchannel_logout_no_sub_no_sid(self):
)
@mock.patch("django_pyoidc.views.SessionStore.delete")
- @mock.patch("django_pyoidc.views.Consumer.backchannel_logout")
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.backchannel_logout")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
def test_valid_backchannel_sub(
self, mocked_provider_config, mocked_backchannel_logout, mocked_session_delete
):
@@ -500,9 +521,9 @@ def test_valid_backchannel_sub(
mocked_session_delete.assert_called_once_with(cache_session_key)
@mock.patch("django_pyoidc.views.SessionStore.delete")
- @mock.patch("django_pyoidc.views.Consumer.backchannel_logout")
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.backchannel_logout")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_valid_backchannel_sid(
self,
mocked_restore,
@@ -546,9 +567,9 @@ def test_valid_backchannel_sid(
mocked_restore.assert_called_once_with(session_state)
@mock.patch("django_pyoidc.views.SessionStore.delete")
- @mock.patch("django_pyoidc.views.Consumer.backchannel_logout")
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
- @mock.patch("django_pyoidc.views.Consumer.restore")
+ @mock.patch("django_pyoidc.client.Consumer.backchannel_logout")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.restore")
def test_invalid_backchannel_sid(
self,
mocked_restore,
@@ -592,8 +613,8 @@ def test_invalid_backchannel_sid(
mocked_restore.assert_called_once_with(session_state)
@mock.patch("django_pyoidc.views.SessionStore.delete")
- @mock.patch("django_pyoidc.views.Consumer.backchannel_logout")
- @mock.patch("django_pyoidc.views.Consumer.provider_config")
+ @mock.patch("django_pyoidc.client.Consumer.backchannel_logout")
+ @mock.patch("django_pyoidc.client.Consumer.provider_config")
def test_valid_backchannel_sub_multiple_sessions(
self, mocked_provider_config, mocked_backchannel_logout, mocked_session_delete
):