diff --git a/README.md b/README.md index 940c316f..45f476e0 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # ![requests_oauth2client](docs/logo.png) `requests_oauth2client` is an OAuth 2.x client for Python, able to obtain, refresh and revoke tokens from any -OAuth2.x/OIDC compliant Authorization Server. It sits upon and extends the famous [requests] HTTP client module. +OAuth2.x/OIDC compliant Authorization Server. It sits upon and extends the famous [requests][] HTTP client module. It can act as an [OAuth 2.0](https://tools.ietf.org/html/rfc6749) / [2.1](https://datatracker.ietf.org/doc/draft-ietf-oauth-v2-1) client, to automatically get and renew Access Tokens, @@ -29,7 +29,7 @@ It also supports [OpenID Connect 1.0](https://openid.net/specs/openid-connect-co as well as using custom params to any endpoint, and other important features that are often overlooked or needlessly complex in other client libraries. -And it also includes a [wrapper][apiclient] around [requests.Session] that makes it super easy to use REST-style APIs, +And it also includes a [wrapper][requests_oauth2client.ApiClient] around [requests.Session][] that makes it super easy to use REST-style APIs, with or without OAuth 2.x. Please note that despite the name, this library has no relationship with Google @@ -93,9 +93,9 @@ OAuth 2.0 and its extensions. These endpoints include the Token Endpoint, Revoca BackChannel Authentication and Device Authorization Endpoints. You must provide the URLs for these endpoints if you intend to use them. Otherwise, only the Token Endpoint is mandatory -to initialize an `OAuth2Client`. +to initialize an [OAuth2Client]. -To initialize an instance of `OAuth2Client`, you only need the Token Endpoint URI from your Authorization Server (AS), +To initialize an instance of [OAuth2Client], you only need the Token Endpoint URI from your Authorization Server (AS), and the credentials for your application, typically a `client_id` and a `client_secret`, usually also provided by the AS: @@ -112,7 +112,7 @@ oauth2client = OAuth2Client( The Token Endpoint is the only endpoint that is mandatory to obtain tokens. Credentials are used to authenticate the client everytime it sends a request to its Authorization Server. Usually, these are a static Client ID and Secret, which are the equivalent of a username and a password, but meant for an application instead of for a human user. The default -authentication method used by `OAuth2Client` is *Client Secret Post*, but other standardized methods such as *Client +authentication method used by [OAuth2Client] is *Client Secret Post*, but other standardized methods such as *Client Secret Basic*, *Client Secret JWT* or *Private Key JWT* are supported as well. See [more about client authentication methods below](#supported-client-authentication-methods). @@ -166,11 +166,11 @@ You can use a [BearerToken] instance anywhere you can use an access_token as str Using [OAuth2Client] directly is useful for testing or debugging OAuth2.x flows, but it may not be suitable for actual applications where tokens must be obtained, used during their lifetime, then obtained again or refreshed once they are -expired. `requests_oauth2client` contains several [requests] compatible Auth Handlers (as subclasses of +expired. `requests_oauth2client` contains several [requests][] compatible Auth Handlers (as subclasses of [requests.auth.AuthBase](https://requests.readthedocs.io/en/latest/user/advanced/#custom-authentication)), that will take care of obtaining tokens when required, then will cache those tokens until they are expired, and will obtain new -ones (or refresh them, when possible), once the initial token is expired. Those are best used with a [requests.Session], -or an [ApiClient], which is a wrapper around `Session` with a few enhancements as described below. +ones (or refresh them, when possible), once the initial token is expired. Those are best used with a [requests.Session][], +or an [ApiClient], which is a wrapper around [requests.Session][] with a few enhancements as described below. ### Client Credentials grant @@ -204,8 +204,8 @@ fulfill your request. #### As Auth Handler You can use the -[OAuth2ClientCredentialsAuth](https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.auth.OAuth2ClientCredentialsAuth) -auth handler. It takes an `OAuth2Client` as parameter, and the additional kwargs to pass to the token endpoint: +[OAuth2ClientCredentialsAuth] +auth handler. It takes an [OAuth2Client] as parameter, and the additional kwargs to pass to the token endpoint: ```python import requests @@ -240,7 +240,7 @@ the previous one is expired. You can configure a leeway, which is a period of time before the actual expiration, in seconds, when a new token will be obtained. This may help getting continuous access to the API when the client and API clocks are slightly out of sync. -Use the parameter `leeway` to `OAuth2ClientCredentialsAuth`: +Use the parameter `leeway` to [OAuth2ClientCredentialsAuth]: ```python from requests_oauth2client import OAuth2ClientCredentialsAuth @@ -264,7 +264,7 @@ Obtaining tokens using the Authorization code grant is made in 3 steps: 3. your application must then exchange this Authorization Code for an *Access Token*, with a request to the Token Endpoint. -Using an `OAuth2Client` will help you with all those steps, as described below. +Using an [OAuth2Client] will help you with all those steps, as described below. #### Generating Authorization Requests @@ -276,9 +276,9 @@ To be able to use the Authorization Code grant, you need 2 (optionally 3) URIs: - optionally, the issuer identifier, if your AS uses [Issuer Identification](https://www.rfc-editor.org/rfc/rfc9207.html). -You can declare those URIs when initializing your `OAuth2Client` instance, or you can +You can declare those URIs when initializing your [OAuth2Client] instance, or you can [use the AS discovery endpoint](#initializing-an-oauth2client-from-a-discovery-document) to initialize those URLs -automatically. Then you can generate valid Authorization Requests by calling the method `.authorization_request()`, with +automatically. Then you can generate valid Authorization Requests by calling the method `OAuth2Client.authorization_request()`, with the request specific parameters, such as `scope`, `state`, `nonce` as parameter: ```python @@ -315,7 +315,7 @@ webbrowser.open(az_request.uri) Note that the `state`, `nonce` and `code_challenge` parameters are generated with secure random values by default. Should you wish to use your own values, you can pass them as parameters to `OAuth2Client.authorization_request()`. For PKCE, you need to pass your generated `code_verifier`, and the `code_challenge` will automatically be derived from it. -If you want to disable PKCE, you can pass `code_challenge_method=None` when initializing your `OAuth2Client`. +If you want to disable PKCE, you can pass `code_challenge_method=None` when initializing your [OAuth2Client]. #### Validating the Authorization Response @@ -334,7 +334,7 @@ response_uri = input("Please enter the full url and/or params obtained on the re az_response = az_request.validate_callback(response_uri) ``` -This `auth_response` is an `AuthorizationResponse` instance and contains everything that is needed for your application +This `auth_response` is an [AuthorizationResponse] instance and contains everything that is needed for your application to complete the authentication and get its tokens from the AS. #### Exchanging code for tokens @@ -352,7 +352,7 @@ token = oauth2client.authorization_code(az_response) This will automatically include the `code`, `redirect_uri` and `code_verifier` parameters in the Token Request, as expected by the AS. You may include extra parameters if required, or you may pass your own parameters, without using an -`AuthorizationResponse` instance, like this: +[AuthorizationResponse] instance, like this: ```python token = oauth2client.authorization_code( @@ -396,10 +396,10 @@ will take care of refreshing the token automatically once it is expired, using t ### Note on AuthorizationRequest -Authorization Requests generated by `OAuth2Client.authorization_request()` are instance of the class -[`AuthorizationRequest`](https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.authorization_request.AuthorizationRequest). +Authorization Requests generated by `OAuth2Client.authorization_request()` are instances of the class [AuthorizationRequest]. + You can also use that class directly to generate your requests, but in that case you need to supply your Authorization -Endpoint URI, your `client_id`, `redirect_uri`, etc. You can access every parameter from an `AuthorizationRequest` +Endpoint URI, your `client_id`, `redirect_uri`, etc. You can access every parameter from an [AuthorizationRequest] instance, as well as the generated `code_verifier`, as attributes of this instance. Once an Authorization Request URL is generated, it your application responsibility to redirect or otherwise send the user to that URL. You may use the `webbrowser` module from Python standard library to do so. Here is an example for generating Authorization Requests: @@ -630,9 +630,9 @@ client = OAuth2Client( With **client_secret_post**, `client_id` and `client_secret` are included as part of the body form data. To use it, pass a -[`ClientSecretPost(client_id, client_secret)`](https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.client_authentication.ClientSecretPost) +[ClientSecretPost] instance `ClientSecretPost(client_id, client_secret)`. as `auth` parameter. This is the default when you pass a tuple `(client_id, client_secret)` as `auth` when initializing -an `OAuth2Client`: +an [OAuth2Client]: ```python from requests_oauth2client import ClientSecretPost, OAuth2Client @@ -825,7 +825,7 @@ check will be made to ensure that the `issuer` from the retrieved metadata docum `DPoP` (Demonstrating Proof of Possession) is supported out-of-the-box. To obtain a *DPoP* token, you can either: -- pass `dpop=True` when using any `OAuth2Client` method that sends a token request, +- pass `dpop=True` when using any [OAuth2Client] method that sends a token request, - or enable `DPoP` by default by passing `dpop_bound_access_tokens=True` when initializing your client. ```python @@ -853,10 +853,10 @@ assert isinstance(token, DPoPToken) ### About `DPoPToken` -`DPoPToken` is actually a `BearerToken` subclass. If you use it as a `requests` Auth Handler, it will take care of +[DPoPToken] is actually a [BearerToken] subclass. If you use it as a `requests` Auth Handler, it will take care of adding a `DPoP` proof to the request headers, in addition to the access token. -Since it is a `BearerToken` subclass, it is fully compatible with the `requests` compatible auth handlers provided by +Since it is a [BearerToken] subclass, it is fully compatible with the `requests` compatible auth handlers provided by `requests_oauth2client`, such as `OAuth2ClientCredentialsAuth`, `OAuth2AccessTokenAuth`, etc. So you may use DPoP with those auth handlers like this: @@ -883,15 +883,15 @@ assert "DPoP" in resp.requests.headers # the appropriate DPoP proof will be inc ``` Since DPoP is enabled by default with `dpop_bound_access_tokens=True`, then the `OAuth2ClientCredentialsAuth` will -obtain and use `DPoPToken` instances. You could also leave it disabled by default and pass `dpop=True` when initializing +obtain and use [DPoPToken] instances. You could also leave it disabled by default and pass `dpop=True` when initializing you auth handler instance: `OAuth2ClientCredentialsAuth(client=client, scope="my_scope", dpop=True)`. ### Choosing your own proof signature keys -By default, the private key used for signing `DPoP` proofs is auto-generated by `OAuth2Client` whenever a new token is +By default, the private key used for signing `DPoP` proofs is auto-generated by [OAuth2Client] whenever a new token is obtained. By default, generated keys are of type *Elliptic Curve* (`EC`), and use the `ES256` signature alg (as in *Elliptic-Curve with a SHA256 hash*). Should you, for testing purposes, wish to generate or use your own key, you may -use the parameter `dpop_key` to provide a key of your choice. It takes a `DPoPKey` instance, which you can generate +use the parameter `dpop_key` to provide a key of your choice. It takes a [DPoPKey] instance, which you can generate using `DPoPKey.generate()`, or by initializing an instance with a key that you previously generated: ```python @@ -935,13 +935,13 @@ assert token.dpop_key == dpop_key ### Hooking into DPoP key and proof generation -Instead of generating your own keys everytime, you may also control how `DPoPKey`s are automatically generated. This can +Instead of generating your own keys everytime, you may also control how [DPoPKey]s are automatically generated. This can be useful for fuzz-testing, pen-testing or feature-testing the Authorization Server. To choose the signing alg, use the parameter `dpop_alg` when initializing your client. This will accordingly determine the key type to generate. You may also pass a custom `dpop_key_generator`, which is a callable that accepts a signature `alg` as parameter, and generates -`DPoPKey` instances. +[DPoPKey] instances. -You can also override the `DPoPToken` class with a custom one, which will be used to represent the DPoP token that is +You can also override the [DPoPToken] class with a custom one, which will be used to represent the DPoP token that is returned by the AS, and then generates proofs and includes those proofs into HTTP requests. You may use `DPoPKey.generate` as a helper method for that, or implement your own generator: @@ -979,9 +979,9 @@ oauth2client = OAuth2Client.from_discovery_endpoint( ### About DPoP nonces -Authorization Server provided `DPoP` nonces are automatically and transparently handled by `OAuth2Client`. +Authorization Server provided `DPoP` nonces are automatically and transparently handled by [OAuth2Client]. -Likewise, Resource Server provided `DPoP` nonces are supported when using the default `DPoPToken` class. +Likewise, Resource Server provided `DPoP` nonces are supported when using the default [DPoPToken] class. This includes all requests-compatible auth handlers provided by `requests_oauth2client`, like `OAuth2AccessTokenAuth`, `OAuth2ClientCredentialsAuth`, `OAuth2AuthorizationCodeAuth`, etc. @@ -1050,16 +1050,16 @@ response2 = api.post("other_endpoint") response3 = api.get("other_endpoint") ``` -AS and RS provided nonces are memoized independently by the `DPoPToken` instance, so the amount of "extra" requests to +AS and RS provided nonces are memoized independently by the [DPoPToken] instance, so the amount of "extra" requests to obtain new DPoP nonces should be minimal. ## Specialized API Client Using APIs usually involves multiple endpoints under the same root url, with a common authentication method. To make it -easier, `requests_oauth2client` includes a [requests.Session] wrapper called [ApiClient], which takes the root API url +easier, `requests_oauth2client` includes a [requests.Session][] wrapper called [ApiClient], which takes the root API url as parameter on initialization. You can then send requests to different endpoints by passing their relative path instead of the full url. [ApiClient] also accepts an `auth` parameter with an AuthHandler. You can pass any of the OAuth2 Auth -Handler from this module, or any [requests]-compatible +Handler from this module, or any [requests][]-compatible [Authentication Handler](https://requests.readthedocs.io/en/latest/user/advanced/#custom-authentication). Which makes it very easy to call APIs that are protected with an OAuth2 Client Credentials Grant: @@ -1102,7 +1102,7 @@ api.resource.get() # will send a GET to https://myapi.local/root/resource api["my-resource"].get() # will send a GET to https://myapi.local/root/my-resource ``` -Both `__getattr__` and `__getitem__` return a new `ApiClient` initialised on the new base_url. So you can easily call +Both `__getattr__` and `__getitem__` return a new `ApiClient` initialised on the new `base_url`. So you can easily call multiple sub-resources on the same API this way: ```python @@ -1112,7 +1112,7 @@ api = ApiClient("https://myapi.local") users_api = api.users user = users_api.get("userid") # GET https://myapi.local/users/userid other_user = users_api.get("other_userid") # GET https://myapi.local/users/other_userid -resources_api = api.resources +resources_api = api["resources"] resources = resources_api.get() # GET https://myapi.local/resources ``` @@ -1135,8 +1135,8 @@ You may override this at request time: resp = api.get("500", raise_for_status=True) ``` -You can access the underlying `requests.Session` with the session attribute, and you can provide an already existing and -configured `Session` instance at init time: +You can access the underlying [requests.Session][] with the session attribute, and you can provide an already existing and +configured [requests.Session][] instance at init time: ```python import requests @@ -1149,6 +1149,94 @@ api = ApiClient("https://myapi.local/resource", session=session) assert api.session == session ``` +## Token and Authorization Requests serialization + +If you implement a web application, you will most likely need to serialize access tokens inside the user session. +Or you may need to serialize Authorization Requests to store them temporarily between multiple HTTP requests. +To make it easier, `requests_oauth2client` provides several classes that implement (de)serialization of [BearerToken], +[DPoPToken], [AuthorizationRequest] (and derivates) and [DPoPKey] to `bytes`. + +```python +from requests_oauth2client import BearerToken, TokenSerializer + +token_serializer = TokenSerializer() + +bearer_token = BearerToken("access_token", expires_in=60) # here is a sample token +serialized_value = token_serializer.dumps(bearer_token) +print(serialized_value) +# b'q1ZKTE5OLS6OL8nPTs1TskLl6iilVhRkFqUWxyeWKFkZmpsZWFiYmJqZ6iiB5eNLKgtSlayUnFITi1KLlGoB' +# you can store that value in session or anywhere needed +# beware, this is decodable clear-text! + +# loading back the token to a BearerToken instance +deserialized_token = token_serializer.loads(serialized_value) +assert isinstance(deserialized_token, BearerToken) +assert deserialized_token == bearer_token +``` + +Default [TokenSerializer] class supports both [BearerToken] and [DPoPToken] instances. + +```python +from requests_oauth2client import AuthorizationRequest, AuthorizationRequestSerializer + +ar_serializer = AuthorizationRequestSerializer() + +auth_request = AuthorizationRequest( + authorization_endpoint="https://my.as.local/authorize", + client_id="my_client_id", + redirect_uri="http://localhost:8000/callback", +) + +serialized_ar = ar_serializer.dumps(auth_request) +assert ar_serializer.loads(serialized_ar) == auth_request +``` + +### Customizing token (de)serialization + +While default serializers work well for standard tokens represented using default classes, you may need to override them +for special purposes, or if you are using custom token classes. +To do that, you can pass custom methods as parameters when initializing your [TokenSerializer] instance: + +```python +from __future__ import annotations + +import base64 +import json +from typing import Any, Mapping + +from requests_oauth2client import BearerToken, TokenSerializer + + +class CustomToken(BearerToken): + TOKEN_TYPE = "CustomToken" + + +def custom_make_instance(args: Mapping[str, Any]) -> BearerToken: + """This will add support for a custom token type.""" + if args.get("token_type") == "CustomToken": + return CustomToken(**args) + return TokenSerializer.default_make_instance(args) + + +def custom_dumper(token: CustomToken) -> bytes: + """This will serialize the token value to base64-encoded JSON""" + args = token.as_dict() + return base64.b64encode(json.dumps(args).encode()) + + +def custom_loader(serialized: bytes) -> dict[str, Any]: + """This will load from a base64-encoded JSON""" + return json.loads(base64.b64decode(serialized)) + + +token_serializer = TokenSerializer(make_instance=custom_make_instance, dumper=custom_dumper, loader=custom_loader) + +my_custom_token = CustomToken(token_type="CustomToken", access_token="...") +serialized = token_serializer.dumps(my_custom_token) +assert serialized == b"eyJhY2Nlc3NfdG9rZW4iOiAiLi4uIiwgInRva2VuX3R5cGUiOiAiQ3VzdG9tVG9rZW4ifQ==" +assert token_serializer.loads(serialized) == my_custom_token +``` + ## Vendor-Specific clients `requests_oauth2client` is flexible enough to handle most use cases, so you should be able to use any AS by any vendor @@ -1159,6 +1247,11 @@ Servers or APIs. [OAuth2Client] has several extensibility points in the form of `OAuth2Client.parse_token_response()`, `OAuth2Client.on_token_error()` that implement response parsing, error handling, etc. +Several helper classes are available in `requests_oauth2client.vendor_specific` for some AS implementations like Auth0 +and Ping Federate. +They provide easy ways to initialize an [OAuth2Client] with the appropriate endpoints, or [ApiClient] wrappers around +the vendor-specific management APIs. + ```python from requests_oauth2client.vendor_specific import Auth0 @@ -1172,8 +1265,14 @@ a0mgmt = Auth0.management_api_client("mytenant.eu", client_id="client_id", clien myusers = a0mgmt.get("users") ``` -[apiclient]: https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.api_client.ApiClient -[bearertoken]: https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.tokens.BearerToken -[oauth2client]: https://guillp.github.io/requests_oauth2client/api/#requests_oauth2client.client.OAuth2Client -[requests]: https://requests.readthedocs.io/en/latest/ -[requests.session]: https://requests.readthedocs.io/en/latest/api/#requests.Session + +[apiclient]: api/#requests_oauth2client.ApiClient +[authorizationrequest]: api/#requests_oauth2client.authorization_request.AuthorizationRequest +[authorizationresponse]: api/#requests_oauth2client.authorization_request.AuthorizationResponse +[bearertoken]: api/#requests_oauth2client.tokens.BearerToken +[clientsecretpost]: api/#requests_oauth2client.client_authentication.ClientSecretPost +[dpopkey]: api/#requests_oauth2client.dpop.DPoPKey +[dpoptoken]: api/#requests_oauth2client.tokens.DPoPToken +[oauth2client]: api/#requests_oauth2client.client.OAuth2Client +[oauth2clientcredentialsauth]: api/#requests_oauth2client.auth.OAuth2ClientCredentialsAuth +[tokenserializer]: api/#requests_oauth2client.serialization.TokenSerializer diff --git a/mkdocs.yml b/mkdocs.yml index 674b045c..bfa746aa 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -41,6 +41,7 @@ markdown_extensions: anchor_linenums: true line_spans: __span pygments_lang_class: true + default_lang: python - pymdownx.inlinehilite - pymdownx.superfences - pymdownx.details @@ -57,11 +58,12 @@ plugins: - autorefs - mkdocstrings: default_handler: python + enable_inventory: true handlers: python: options: - #extensions: - #- griffe_fieldz: {include_inherited: true} + extensions: + - griffe_fieldz: {include_inherited: true} filters: - "!^_" - "^__init__" @@ -69,7 +71,7 @@ plugins: members_order: source show_root_heading: true show_submodules: true - import: + inventories: - https://requests.readthedocs.io/en/master/objects.inv - https://guillp.github.io/jwskate/objects.inv extra: diff --git a/pyproject.toml b/pyproject.toml index 94575d4e..64c95245 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,13 +42,14 @@ dev = [ "virtualenv>=20.30.0", ] doc = [ + "griffe-fieldz>=0.3.0", "mkdocs>=1.3.1", "mkdocs-autorefs>=0.3.0", "mkdocs-include-markdown-plugin>=6", "mkdocs-material>=9.6.11", "mkdocs-material-extensions>=1.0.1", "mkdocstrings[python]>=0.29.1", - ] +] test = [ "coverage>=7.8.0", "pytest>=7.0.1", diff --git a/requests_oauth2client/__init__.py b/requests_oauth2client/__init__.py index a2b6039e..65e74e9a 100644 --- a/requests_oauth2client/__init__.py +++ b/requests_oauth2client/__init__.py @@ -18,16 +18,13 @@ ) from .authorization_request import ( AuthorizationRequest, - AuthorizationRequestSerializer, AuthorizationResponse, - CodeChallengeMethods, InvalidCodeVerifierParam, InvalidMaxAgeParam, MissingIssuerParam, PkceUtils, RequestParameterAuthorizationRequest, RequestUriParameterAuthorizationRequest, - ResponseTypes, UnsupportedCodeChallengeMethod, UnsupportedResponseTypeParam, ) @@ -36,8 +33,6 @@ BackChannelAuthenticationResponse, ) from .client import ( - Endpoints, - GrantTypes, InvalidAcrValuesParam, InvalidBackchannelAuthenticationRequestHintParam, InvalidDiscoveryDocument, @@ -93,6 +88,7 @@ RepeatedDPoPNonce, validate_dpop_proof, ) +from .enums import CodeChallengeMethods, Endpoints, GrantTypes, ResponseTypes from .exceptions import ( AccessDenied, AccountSelectionRequired, @@ -135,9 +131,9 @@ from .polling import ( BaseTokenEndpointPollingJob, ) +from .serializers import AuthorizationRequestSerializer, TokenSerializer from .tokens import ( BearerToken, - BearerTokenSerializer, ExpiredAccessToken, ExpiredIdToken, IdToken, @@ -174,7 +170,6 @@ "BaseTokenEndpointPollingJob", "BaseTokenEndpointPoolingJob", "BearerToken", - "BearerTokenSerializer", "ClientSecretBasic", "ClientSecretJwt", "ClientSecretPost", @@ -267,6 +262,7 @@ "SignatureAlgs", "SlowDown", "TokenEndpointError", + "TokenSerializer", "UnauthorizedClient", "UnknownActorTokenType", "UnknownIntrospectionError", diff --git a/requests_oauth2client/authorization_request.py b/requests_oauth2client/authorization_request.py index ed277a38..3d958386 100644 --- a/requests_oauth2client/authorization_request.py +++ b/requests_oauth2client/authorization_request.py @@ -4,9 +4,8 @@ import re import secrets -from enum import Enum from functools import cached_property -from typing import TYPE_CHECKING, Any, Callable, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar from attrs import asdict, field, fields, frozen from binapy import BinaPy @@ -14,6 +13,7 @@ from jwskate import JweCompact, Jwk, Jwt, SignatureAlgs, SignedJwt from .dpop import DPoPKey +from .enums import CodeChallengeMethods, ResponseTypes from .exceptions import ( AuthorizationResponseError, ConsentRequired, @@ -32,34 +32,6 @@ from datetime import datetime -class ResponseTypes(str, Enum): - """All standardised `response_type` values. - - Note that you should always use `code`. All other values are deprecated. - - """ - - CODE = "code" - NONE = "none" - TOKEN = "token" - IDTOKEN = "id_token" - CODE_IDTOKEN = "code id_token" - CODE_TOKEN = "code token" - CODE_IDTOKEN_TOKEN = "code id_token token" - IDTOKEN_TOKEN = "id_token token" - - -class CodeChallengeMethods(str, Enum): - """All standardised `code_challenge_method` values. - - You should always use `S256`. - - """ - - S256 = "S256" - plain = "plain" - - class UnsupportedCodeChallengeMethod(ValueError): """Raised when an unsupported `code_challenge_method` is provided.""" @@ -910,92 +882,3 @@ def __getattr__(self, item: str) -> Any: def __repr__(self) -> str: """Return the Authorization Request URI, as a `str`.""" return self.uri - - -class AuthorizationRequestSerializer: - """(De)Serializer for `AuthorizationRequest` instances. - - You might need to store pending authorization requests in session, either server-side or client- side. This class is - here to help you do that. - - """ - - def __init__( - self, - dumper: Callable[[AuthorizationRequest], str] | None = None, - loader: Callable[[str], AuthorizationRequest] | None = None, - ) -> None: - self.dumper = dumper or self.default_dumper - self.loader = loader or self.default_loader - - @staticmethod - def default_dumper(azr: AuthorizationRequest) -> str: - """Provide a default dumper implementation. - - Serialize an AuthorizationRequest as JSON, then compress with deflate, then encodes as - base64url. - - Args: - azr: the `AuthorizationRequest` to serialize - - Returns: - the serialized value - - """ - d = asdict(azr) - if azr.dpop_key: - d["dpop_key"]["private_key"] = azr.dpop_key.private_key.to_dict() - d.update(**d.pop("kwargs", {})) - return BinaPy.serialize_to("json", d).to("deflate").to("b64u").ascii() - - @staticmethod - def default_loader( - serialized: str, - azr_class: type[AuthorizationRequest] = AuthorizationRequest, - ) -> AuthorizationRequest: - """Provide a default deserializer implementation. - - This does the opposite operations than `default_dumper`. - - Args: - serialized: the serialized AuthorizationRequest - azr_class: the class to deserialize the Authorization Request to - - Returns: - an AuthorizationRequest - - """ - args = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") - - if dpop_key := args.get("dpop_key"): - dpop_key["private_key"] = Jwk(dpop_key["private_key"]) - dpop_key.pop("jti_generator", None) - dpop_key.pop("iat_generator", None) - dpop_key.pop("dpop_token_class", None) - args["dpop_key"] = DPoPKey(**dpop_key) - - return azr_class(**args) - - def dumps(self, azr: AuthorizationRequest) -> str: - """Serialize and compress a given AuthorizationRequest for easier storage. - - Args: - azr: an AuthorizationRequest to serialize - - Returns: - the serialized AuthorizationRequest, as a str - - """ - return self.dumper(azr) - - def loads(self, serialized: str) -> AuthorizationRequest: - """Deserialize a serialized AuthorizationRequest. - - Args: - serialized: the serialized AuthorizationRequest - - Returns: - the deserialized AuthorizationRequest - - """ - return self.loader(serialized) diff --git a/requests_oauth2client/client.py b/requests_oauth2client/client.py index d2b96072..15454528 100644 --- a/requests_oauth2client/client.py +++ b/requests_oauth2client/client.py @@ -3,7 +3,6 @@ from __future__ import annotations import warnings -from enum import Enum from typing import TYPE_CHECKING, Any, Callable, ClassVar, TypeVar import requests @@ -14,16 +13,15 @@ from .authorization_request import ( AuthorizationRequest, AuthorizationResponse, - CodeChallengeMethods, MissingIssuerParam, RequestUriParameterAuthorizationRequest, - ResponseTypes, ) from .backchannel_authentication import BackChannelAuthenticationResponse from .client_authentication import ClientSecretPost, PrivateKeyJwt, client_auth_factory from .device_authorization import DeviceAuthorizationResponse from .discovery import oidc_discovery_document_url from .dpop import DPoPKey, DPoPToken, InvalidDPoPAlg, MissingDPoPNonce, RepeatedDPoPNonce +from .enums import CodeChallengeMethods, Endpoints, GrantTypes, ResponseTypes, TokenType from .exceptions import ( AccessDenied, AuthorizationPending, @@ -50,7 +48,7 @@ UnsupportedTokenType, UseDPoPNonce, ) -from .tokens import BearerToken, IdToken, TokenResponse, TokenType +from .tokens import BearerToken, IdToken, TokenResponse from .utils import InvalidUri, validate_endpoint_uri, validate_issuer_uri if TYPE_CHECKING: @@ -169,24 +167,6 @@ def __init__(self, message: str, discovery_document: dict[str, Any]) -> None: self.discovery_document = discovery_document -class Endpoints(str, Enum): - """All standardised OAuth 2.0 and extensions endpoints. - - If an endpoint is not mentioned here, then its usage is not supported by OAuth2Client. - - """ - - TOKEN = "token_endpoint" - AUTHORIZATION = "authorization_endpoint" - BACKCHANNEL_AUTHENTICATION = "backchannel_authentication_endpoint" - DEVICE_AUTHORIZATION = "device_authorization_endpoint" - INTROSPECTION = "introspection_endpoint" - REVOCATION = "revocation_endpoint" - PUSHED_AUTHORIZATION_REQUEST = "pushed_authorization_request_endpoint" - JWKS = "jwks_uri" - USER_INFO = "userinfo_endpoint" - - class MissingEndpointUri(AttributeError): """Raised when a required endpoint uri is not known.""" @@ -194,19 +174,6 @@ def __init__(self, endpoint: str) -> None: super().__init__(f"No '{endpoint}' defined for this client.") -class GrantTypes(str, Enum): - """An enum of standardized `grant_type` values.""" - - CLIENT_CREDENTIALS = "client_credentials" - AUTHORIZATION_CODE = "authorization_code" - REFRESH_TOKEN = "refresh_token" - RESOURCE_OWNER_PASSWORD = "password" - TOKEN_EXCHANGE = "urn:ietf:params:oauth:grant-type:token-exchange" - JWT_BEARER = "urn:ietf:params:oauth:grant-type:jwt-bearer" - CLIENT_INITIATED_BACKCHANNEL_AUTHENTICATION = "urn:openid:params:grant-type:ciba" - DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code" - - @frozen(init=False) class OAuth2Client: """An OAuth 2.x Client that can send requests to an OAuth 2.x Authorization Server. diff --git a/requests_oauth2client/deprecated.py b/requests_oauth2client/deprecated.py index 1f566fb7..6ddf0463 100644 --- a/requests_oauth2client/deprecated.py +++ b/requests_oauth2client/deprecated.py @@ -3,6 +3,7 @@ """Mark a class as deprecated. https://stackoverflow.com/a/52087847 + """ from warnings import warn @@ -10,6 +11,7 @@ from .backchannel_authentication import BackChannelAuthenticationPollingJob from .device_authorization import DeviceAuthorizationPollingJob from .polling import BaseTokenEndpointPollingJob +from .serializers import TokenSerializer class _DeprecatedClassMeta(type): @@ -76,8 +78,13 @@ class DeviceAuthorizationPoolingJob(metaclass=_DeprecatedClassMeta): _DeprecatedClassMeta__alias = DeviceAuthorizationPollingJob +class BearerTokenSerializer(metaclass=_DeprecatedClassMeta): + _DeprecatedClassMeta__alias = TokenSerializer + + __all__ = [ "BackChannelAuthenticationPoolingJob", "BaseTokenEndpointPoolingJob", + "BearerTokenSerializer", "DeviceAuthorizationPoolingJob", ] diff --git a/requests_oauth2client/dpop.py b/requests_oauth2client/dpop.py index 4e05d489..47361c11 100644 --- a/requests_oauth2client/dpop.py +++ b/requests_oauth2client/dpop.py @@ -13,9 +13,10 @@ from binapy import BinaPy from furl import furl # type: ignore[import-untyped] from requests import codes -from typing_extensions import Self +from typing_extensions import Self, override -from .tokens import AccessTokenTypes, BearerToken, IdToken, id_token_converter +from .enums import AccessTokenTypes +from .tokens import BearerToken, IdToken, id_token_converter from .utils import accepts_expires_in if TYPE_CHECKING: @@ -149,6 +150,7 @@ def _response_hook(self, response: requests.Response, **kwargs: Any) -> requests return response + @override def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: """Add a DPoP proof in each request.""" request = super().__call__(request) @@ -156,6 +158,15 @@ def __call__(self, request: requests.PreparedRequest) -> requests.PreparedReques request.register_hook("response", self._response_hook) # type: ignore[no-untyped-call] return request + @override + def as_dict(self, with_expires_in: bool = True) -> dict[str, Any]: + d = super().as_dict(with_expires_in=with_expires_in) + d["dpop_key"]["private_key"] = self.dpop_key.private_key.to_dict() + d["dpop_key"].pop("jti_generator", None) + d["dpop_key"].pop("iat_generator", None) + d["dpop_key"].pop("dpop_token_class", None) + return d + def add_dpop_proof( request: requests.PreparedRequest, @@ -205,8 +216,8 @@ class DPoPKey: alg: str = field(on_setattr=setters.frozen) private_key: jwskate.Jwk = field(on_setattr=setters.frozen, repr=False) - jti_generator: Callable[[], str] = field(on_setattr=setters.frozen, repr=False) - iat_generator: Callable[[], int] = field(on_setattr=setters.frozen, repr=False) + jti_generator: Callable[[], str] = field(on_setattr=setters.frozen, repr=False, eq=False) + iat_generator: Callable[[], int] = field(on_setattr=setters.frozen, repr=False, eq=False) jwt_typ: str = field(on_setattr=setters.frozen, repr=False) dpop_token_class: type[DPoPToken] = field(on_setattr=setters.frozen, repr=False) as_nonce: str | None diff --git a/requests_oauth2client/enums.py b/requests_oauth2client/enums.py new file mode 100644 index 00000000..42431b3b --- /dev/null +++ b/requests_oauth2client/enums.py @@ -0,0 +1,83 @@ +"""Contains enumerations of standardised OAuth-related parameters and values. + +Most are taken from https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml . + +""" + +from __future__ import annotations + +from enum import Enum + + +class AccessTokenTypes(str, Enum): + """An enum of standardised `access_token` types.""" + + BEARER = "Bearer" + DPOP = "DPoP" + + +class CodeChallengeMethods(str, Enum): + """All standardised `code_challenge_method` values. + + You should always use `S256`. + + """ + + S256 = "S256" + plain = "plain" + + +class Endpoints(str, Enum): + """All standardised OAuth 2.0 and extensions endpoints. + + If an endpoint is not mentioned here, then its usage is not supported by OAuth2Client. + + """ + + TOKEN = "token_endpoint" + AUTHORIZATION = "authorization_endpoint" + BACKCHANNEL_AUTHENTICATION = "backchannel_authentication_endpoint" + DEVICE_AUTHORIZATION = "device_authorization_endpoint" + INTROSPECTION = "introspection_endpoint" + REVOCATION = "revocation_endpoint" + PUSHED_AUTHORIZATION_REQUEST = "pushed_authorization_request_endpoint" + JWKS = "jwks_uri" + USER_INFO = "userinfo_endpoint" + + +class GrantTypes(str, Enum): + """An enum of standardized `grant_type` values.""" + + CLIENT_CREDENTIALS = "client_credentials" + AUTHORIZATION_CODE = "authorization_code" + REFRESH_TOKEN = "refresh_token" + RESOURCE_OWNER_PASSWORD = "password" + TOKEN_EXCHANGE = "urn:ietf:params:oauth:grant-type:token-exchange" + JWT_BEARER = "urn:ietf:params:oauth:grant-type:jwt-bearer" + CLIENT_INITIATED_BACKCHANNEL_AUTHENTICATION = "urn:openid:params:grant-type:ciba" + DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code" + + +class ResponseTypes(str, Enum): + """All standardised `response_type` values. + + Note that you should always use `code`. All other values are deprecated. + + """ + + CODE = "code" + NONE = "none" + TOKEN = "token" + IDTOKEN = "id_token" + CODE_IDTOKEN = "code id_token" + CODE_TOKEN = "code token" + CODE_IDTOKEN_TOKEN = "code id_token token" + IDTOKEN_TOKEN = "id_token token" + + +class TokenType(str, Enum): + """An enum of standardised `token_type` values.""" + + ACCESS_TOKEN = "access_token" + REFRESH_TOKEN = "refresh_token" + ID_TOKEN = "id_token" diff --git a/requests_oauth2client/exceptions.py b/requests_oauth2client/exceptions.py index 7fadbae5..51073120 100644 --- a/requests_oauth2client/exceptions.py +++ b/requests_oauth2client/exceptions.py @@ -264,3 +264,11 @@ class InvalidBackChannelAuthenticationResponse(OAuth2Error): class InvalidPushedAuthorizationResponse(OAuth2Error): """Raised when the Pushed Authorization Endpoint returns an error.""" + + +class UnsupportedTokenTypeError(ValueError): + """Raised when an unsupported token_type is provided.""" + + def __init__(self, token_type: str) -> None: + super().__init__(f"Unsupported token_type: {token_type}") + self.token_type = token_type diff --git a/requests_oauth2client/flask/auth.py b/requests_oauth2client/flask/auth.py index 30acbb6f..38a3d50c 100644 --- a/requests_oauth2client/flask/auth.py +++ b/requests_oauth2client/flask/auth.py @@ -6,8 +6,9 @@ from flask import session +from requests_oauth2client import TokenSerializer from requests_oauth2client.auth import OAuth2ClientCredentialsAuth -from requests_oauth2client.tokens import BearerToken, BearerTokenSerializer +from requests_oauth2client.tokens import BearerToken class FlaskSessionAuthMixin: @@ -26,11 +27,11 @@ class FlaskSessionAuthMixin: def __init__( self, session_key: str, - serializer: BearerTokenSerializer | None = None, + serializer: TokenSerializer | None = None, *args: Any, **token_kwargs: Any, ) -> None: - self.serializer = serializer or BearerTokenSerializer() + self.serializer = serializer or TokenSerializer() self.session_key = session_key super().__init__(*args, **token_kwargs) diff --git a/requests_oauth2client/serializers.py b/requests_oauth2client/serializers.py new file mode 100644 index 00000000..f8b7fe3f --- /dev/null +++ b/requests_oauth2client/serializers.py @@ -0,0 +1,288 @@ +"""Contain utility classes for serializing/deserializing objects such as `BearerToken`, `AuthorizationRequest`, etc. + +Those objects are typically stored in session when used in Web Applications, so they must be easily (de)serializable +to/from strings. + +While those classes provide default implementation that should work well for most cases, you might have to customize, +subclass or replace those classes to support custom features from your application. + +""" + +from __future__ import annotations + +from abc import ABC +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, TypeVar, Union + +import jwskate +from attr import asdict, field, frozen +from binapy import BinaPy + +from .authorization_request import ( + AuthorizationRequest, + RequestParameterAuthorizationRequest, + RequestUriParameterAuthorizationRequest, +) +from .dpop import DPoPKey, DPoPToken +from .exceptions import UnsupportedTokenTypeError +from .tokens import BearerToken + +if TYPE_CHECKING: + from collections.abc import Mapping + + +T = TypeVar("T") + + +@frozen +class Serializer(ABC, Generic[T]): + """Abstract class for (de)serializers.""" + + dumper: Callable[[T], bytes] = field(repr=False) + loader: Callable[[bytes], dict[str, Any]] = field(repr=False) + make_instance: Callable[[Mapping[str, Any]], T] = field(repr=False) + + def dumps(self, instance: T) -> bytes: + """Serialize and compress a given token for easier storage. + + Args: + instance: a BearerToken to serialize + + Returns: + the serialized token, as a str + + """ + return self.dumper(instance) + + def loads(self, serialized: bytes) -> T: + """Deserialize a serialized token. + + Args: + serialized: the serialized token + + Returns: + the deserialized token + + """ + data = self.loader(serialized) + return self.make_instance(data) + + +@frozen +class TokenSerializer(Serializer[BearerToken]): + """A helper class to serialize Token Response returned by an AS. + + This may be used to store BearerTokens in session or cookies. + + It needs a `dumper` and a `loader` functions that will respectively serialize and deserialize + BearerTokens (or subclasses). + + Default implementation uses gzip and base64url on the serialized JSON representation. + It supports `BearerToken` and `DPoPToken` instances. + + """ + + dumper: Callable[[BearerToken], bytes] = field(repr=False, factory=lambda: TokenSerializer.default_dumper) + loader: Callable[[bytes], dict[str, Any]] = field(repr=False, factory=lambda: TokenSerializer.default_loader) + make_instance: Callable[[Mapping[str, Any]], BearerToken] = field( + repr=False, factory=lambda: TokenSerializer.default_make_instance + ) + + @classmethod + def default_make_instance(cls, args: Mapping[str, Any]) -> BearerToken: + """Instantiate the appropriate Token class, based on `token_type` in the provided `args`. + + This default implementation only supports "Bearer" and "DPoP" token_types, + and will deserialize to `BearerToken` and `DPoPToken` instances. + """ + token_type = args["token_type"].lower() + if token_type == "bearer": + return BearerToken(**args) + if token_type == "dpop": + return DPoPToken(**args) + raise UnsupportedTokenTypeError(token_type) + + @classmethod + def default_dumper(cls, token: BearerToken) -> bytes: + """Serialize a token as JSON, then compress with deflate, then encodes as base64url. + + Args: + token: the `BearerToken` to serialize + + Returns: + the serialized value + + """ + d = token.as_dict(with_expires_in=False) + return BinaPy.serialize_to("json", {k: w for k, w in d.items() if w is not None}).to("deflate").to("b64u") + + @classmethod + def default_loader(cls, serialized: bytes) -> dict[str, Any]: + """Deserialize a BearerToken. + + This does the opposite operations than `default_dumper`. + + Args: + serialized: The serialized token. + + Returns: + a `BearerToken` or one of its subclasses. + + """ + args: dict[str, Any] = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") + expires_at = args.get("expires_at") + if expires_at: + args["expires_at"] = datetime.fromtimestamp(expires_at, tz=timezone.utc) + + dpop_key = args.get("dpop_key") + if dpop_key: + dpop_key["private_key"] = jwskate.Jwk(dpop_key["private_key"]) + args["_dpop_key"] = DPoPKey(**args.pop("dpop_key")) + + return args + + +@frozen +class DPoPKeySerializer(Serializer[DPoPKey]): + """A (de)serializer for `DPoPKey` instances.""" + + dumper: Callable[[DPoPKey], bytes] = field(factory=lambda: DPoPKeySerializer.default_dumper) + loader: Callable[[bytes], dict[str, Any]] = field(factory=lambda: DPoPKeySerializer.default_loader) + make_instance: Callable[[Mapping[str, Any]], DPoPKey] = field( + repr=False, factory=lambda: DPoPKeySerializer.default_make_instance + ) + + @classmethod + def default_make_instance(cls, args: Mapping[str, Any]) -> DPoPKey: + """Instantiate the appropriate `DPoPKey` class based on `args`. + + Default implementation always returns `DPoPKey`. + """ + return DPoPKey(**args) + + @classmethod + def default_dumper(cls, dpop_key: DPoPKey) -> bytes: + """Provide a default dumper implementation. + + This will not serialize jti_generator, iat_generator, and dpop_token_class! + + """ + d = dpop_key.private_key.to_dict() + d.pop("jti_generator", None) + d.pop("iat_generator", None) + d.pop("dpop_token_class", None) + return BinaPy.serialize_to("json", d).to("deflate").to("b64u") + + @classmethod + def default_loader( + cls, + serialized: bytes, + ) -> dict[str, Any]: + """Provide a default deserializer implementation. + + This will not deserialize iat_generator, iat_generator, and dpop_token_class! + + """ + private_key = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") + return {"private_key": private_key} + + +@frozen +class AuthorizationRequestSerializer( + Serializer[ + Union[AuthorizationRequest, RequestParameterAuthorizationRequest, RequestUriParameterAuthorizationRequest] + ] +): + """(De)Serializer for `AuthorizationRequest` instances. + + Default implementation supports `AuthorizationRequest`, `RequestParameterAuthorizationRequest`, and + `RequestUriParameterAuthorizationRequest`. + + """ + + dumper: Callable[ + [AuthorizationRequest | RequestParameterAuthorizationRequest | RequestUriParameterAuthorizationRequest], bytes + ] = field(factory=lambda: AuthorizationRequestSerializer.default_dumper) + loader: Callable[ + [ + bytes, + ], + dict[str, Any], + ] = field(factory=lambda: AuthorizationRequestSerializer.default_loader) + make_instance: Callable[ + [Mapping[str, Any]], + AuthorizationRequest | RequestParameterAuthorizationRequest | RequestUriParameterAuthorizationRequest, + ] = field(repr=False, factory=lambda: AuthorizationRequestSerializer.default_make_instance) + + dpop_key_serializer: ClassVar[Serializer[DPoPKey]] = DPoPKeySerializer() + + @classmethod + def default_make_instance( + cls, args: Mapping[str, Any] + ) -> AuthorizationRequest | RequestParameterAuthorizationRequest | RequestUriParameterAuthorizationRequest: + """Provide a default get_class implementation. + + - If there is a `request` parameter in the authorization request parameters, + this returns `RequestParameterAuthorizationRequest`. + - If there is a `request_uri` parameter in the authorization request parameters, + this returns `RequestUriParameterAuthorizationRequest`. + - Otherwise, returns `AuthorizationRequest`. + + Args: + args: the token attributes and values. + + Returns: + The appropriate AuthorizationRequest class. + + """ + if "request" in args: + return RequestParameterAuthorizationRequest(**args) + if "request_uri" in args: + return RequestUriParameterAuthorizationRequest(**args) + return AuthorizationRequest(**args) + + @classmethod + def default_dumper( + cls, + azr: AuthorizationRequest | RequestParameterAuthorizationRequest | RequestUriParameterAuthorizationRequest, + ) -> bytes: + """Provide a default dumper implementation. + + Serialize an AuthorizationRequest as JSON, then compress with deflate, then encodes as + base64url. + + Args: + azr: the `AuthorizationRequest` to serialize + + Returns: + the serialized value + + """ + d = asdict(azr) + if azr.dpop_key: + d["dpop_key"] = cls.dpop_key_serializer.dumps(azr.dpop_key) + d.update(**d.pop("kwargs", {})) + return BinaPy.serialize_to("json", d).to("deflate").to("b64u") + + @classmethod + def default_loader( + cls, + serialized: bytes, + ) -> dict[str, Any]: + """Provide a default deserializer implementation. + + This does the opposite operations than `default_dumper`. + + Args: + serialized: the serialized AuthorizationRequest + + Returns: + an AuthorizationRequest + + """ + args: dict[str, Any] = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") + + if args["dpop_key"]: + args["dpop_key"] = cls.dpop_key_serializer.loads(args["dpop_key"]) + + return args diff --git a/requests_oauth2client/tokens.py b/requests_oauth2client/tokens.py index 91ec4a20..42b43459 100644 --- a/requests_oauth2client/tokens.py +++ b/requests_oauth2client/tokens.py @@ -3,7 +3,6 @@ from __future__ import annotations from datetime import datetime, timedelta, timezone -from enum import Enum from functools import cached_property from math import ceil from typing import TYPE_CHECKING, Any, Callable, ClassVar @@ -14,6 +13,8 @@ from binapy import BinaPy from typing_extensions import Self +from .enums import AccessTokenTypes +from .exceptions import UnsupportedTokenTypeError from .utils import accepts_expires_in if TYPE_CHECKING: @@ -23,29 +24,6 @@ from .client import OAuth2Client -class TokenType(str, Enum): - """An enum of standardised `token_type` values.""" - - ACCESS_TOKEN = "access_token" - REFRESH_TOKEN = "refresh_token" - ID_TOKEN = "id_token" - - -class AccessTokenTypes(str, Enum): - """An enum of standardised `access_token` types.""" - - BEARER = "Bearer" - DPOP = "DPoP" - - -class UnsupportedTokenType(ValueError): - """Raised when an unsupported token_type is provided.""" - - def __init__(self, token_type: str) -> None: - super().__init__(f"Unsupported token_type: {token_type}") - self.token_type = token_type - - class IdToken(jwskate.SignedJwt): """Represent an ID Token. @@ -297,7 +275,7 @@ def __init__( **kwargs: Any, ) -> None: if token_type.title() != self.TOKEN_TYPE.title(): - raise UnsupportedTokenType(token_type) + raise UnsupportedTokenTypeError(token_type) id_token = id_token_converter(id_token) @@ -532,15 +510,21 @@ def __str__(self) -> str: """ return self.access_token - def as_dict(self) -> dict[str, Any]: + def as_dict(self, *, with_expires_in: bool = True) -> dict[str, Any]: """Return a dict of parameters. That is suitable for serialization or to init another BearerToken. + Args: + with_expires_in: if True, the dict will include the expires_in attribute, + which is a relative lifetime in seconds. + Otherwise, it will be transformed to `expires_at`, an absolute expiration datetime. + """ d = asdict(self) - d.pop("expires_at") - d["expires_in"] = self.expires_in + if with_expires_in: + d.pop("expires_at") + d["expires_in"] = self.expires_in d.update(**d.pop("kwargs", {})) return {key: val for key, val in d.items() if val is not None} @@ -603,87 +587,3 @@ def access_token_jwt(self) -> jwskate.SignedJwt: """ return jwskate.SignedJwt(self.access_token) - - -class BearerTokenSerializer: - """A helper class to serialize Token Response returned by an AS. - - This may be used to store BearerTokens in session or cookies. - - It needs a `dumper` and a `loader` functions that will respectively serialize and deserialize - BearerTokens. Default implementations are provided with use gzip and base64url on the serialized - JSON representation. - - Args: - dumper: a function to serialize a token into a `str`. - loader: a function to deserialize a serialized token representation. - - """ - - def __init__( - self, - dumper: Callable[[BearerToken], str] | None = None, - loader: Callable[[str], BearerToken] | None = None, - ) -> None: - self.dumper = dumper or self.default_dumper - self.loader = loader or self.default_loader - - @staticmethod - def default_dumper(token: BearerToken) -> str: - """Serialize a token as JSON, then compress with deflate, then encodes as base64url. - - Args: - token: the `BearerToken` to serialize - - Returns: - the serialized value - - """ - d = asdict(token) - d.update(**d.pop("kwargs", {})) - return ( - BinaPy.serialize_to("json", {k: w for k, w in d.items() if w is not None}).to("deflate").to("b64u").ascii() - ) - - def default_loader(self, serialized: str, token_class: type[BearerToken] = BearerToken) -> BearerToken: - """Deserialize a BearerToken. - - This does the opposite operations than `default_dumper`. - - Args: - serialized: the serialized token - token_class: class to use to deserialize the Token - - Returns: - a BearerToken - - """ - attrs = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") - expires_at = attrs.get("expires_at") - if expires_at: - attrs["expires_at"] = datetime.fromtimestamp(expires_at, tz=timezone.utc) - return token_class(**attrs) - - def dumps(self, token: BearerToken) -> str: - """Serialize and compress a given token for easier storage. - - Args: - token: a BearerToken to serialize - - Returns: - the serialized token, as a str - - """ - return self.dumper(token) - - def loads(self, serialized: str) -> BearerToken: - """Deserialize a serialized token. - - Args: - serialized: the serialized token - - Returns: - the deserialized token - - """ - return self.loader(serialized) diff --git a/tests/test_token_exchange.py b/tests/test_token_exchange.py index 07db06ef..8f9d0811 100644 --- a/tests/test_token_exchange.py +++ b/tests/test_token_exchange.py @@ -1,3 +1,4 @@ +import re import secrets import pytest @@ -94,7 +95,9 @@ def test_token_type() -> None: assert OAuth2Client.get_token_type("saml2") == "urn:ietf:params:oauth:token-type:saml2" assert OAuth2Client.get_token_type("jwt") == "urn:ietf:params:oauth:token-type:jwt" - with pytest.raises(TypeError, match="token is of type ''") as exc: + with pytest.raises( + TypeError, match=re.escape("token is of type ''") + ) as exc: OAuth2Client.get_token_type( token_type="access_token", token=IdToken( @@ -121,6 +124,8 @@ def test_token_type() -> None: OAuth2Client.get_token_type(token_type="refresh_token", token=BearerToken("mytoken")) assert exc.type is UnknownTokenType - with pytest.raises(TypeError, match="token is of type '") as exc2: + with pytest.raises( + TypeError, match=re.escape("token is of type '") + ) as exc2: OAuth2Client.get_token_type(token_type="id_token", token=BearerToken("mytoken")) assert exc2.type is UnknownTokenType diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index ebce49a2..2b84cc27 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -17,9 +17,11 @@ ClientSecretBasic, ClientSecretJwt, ClientSecretPost, + DPoPKey, OAuth2Client, PrivateKeyJwt, PublicApp, + RequestParameterAuthorizationRequest, ) if TYPE_CHECKING: @@ -50,6 +52,13 @@ def bearer_auth(access_token: str) -> BearerToken: return BearerToken(access_token) +@pytest.fixture(scope="session", params=[None, "ES256"]) +def dpop_key(request: FixtureRequest) -> DPoPKey | None: + if request.param is None: + return None + return DPoPKey.generate(alg=request.param) + + @pytest.fixture(scope="session") def target_api() -> str: return "https://myapi.local/root/" @@ -384,6 +393,7 @@ def authorization_request( # noqa: C901 code_challenge_method: str, expected_issuer: str | None, auth_request_kwargs: dict[str, Any], + dpop_key: DPoPKey, acr_values: None | str | list[str], ) -> AuthorizationRequest: authorization_response_iss_parameter_supported = bool(expected_issuer) @@ -399,6 +409,7 @@ def authorization_request( # noqa: C901 code_challenge_method=code_challenge_method, authorization_response_iss_parameter_supported=authorization_response_iss_parameter_supported, issuer=expected_issuer, + dpop_key=dpop_key, acr_values=acr_values, **auth_request_kwargs, ) @@ -411,6 +422,7 @@ def authorization_request( # noqa: C901 assert azr.redirect_uri == redirect_uri assert azr.issuer == expected_issuer assert azr.kwargs == auth_request_kwargs + assert azr.dpop_key == dpop_key args = dict(url.args) expected_args = dict( @@ -494,6 +506,9 @@ def authorization_request( # noqa: C901 assert generated_code_challenge == code_verifier assert azr.code_verifier == code_verifier + if dpop_key: + expected_args["dpop_jkt"] = dpop_key.dpop_jkt + if acr_values is None: assert azr.acr_values is None assert "acr_values" not in args @@ -540,3 +555,16 @@ def authorization_response( assert auth_response.code_verifier == authorization_request.code_verifier return auth_response + + +@pytest.fixture(scope="session") +def request_parameter_signing_key() -> Jwk: + return Jwk.generate(alg="ES256") + + +@pytest.fixture +def request_parameter_authorization_request( + authorization_request: AuthorizationRequest, + request_parameter_signing_key: Jwk, +) -> RequestParameterAuthorizationRequest: + return authorization_request.sign(request_parameter_signing_key) diff --git a/tests/unit_tests/test_authorization_request.py b/tests/unit_tests/test_authorization_request.py index 4232ac38..89230338 100644 --- a/tests/unit_tests/test_authorization_request.py +++ b/tests/unit_tests/test_authorization_request.py @@ -9,10 +9,8 @@ from requests_oauth2client import ( AuthorizationRequest, - AuthorizationRequestSerializer, AuthorizationResponse, AuthorizationResponseError, - DPoPKey, InvalidMaxAgeParam, MismatchingIssuer, MismatchingState, @@ -188,31 +186,6 @@ def test_missing_issuer( authorization_request.validate_callback(authorization_response_uri) -def test_authorization_request_serializer(authorization_request: AuthorizationRequest) -> None: - serializer = AuthorizationRequestSerializer() - serialized = serializer.dumps(authorization_request) - assert serializer.loads(serialized) == authorization_request - - -def test_authorization_request_serializer_with_dpop_key() -> None: - dpop_key = DPoPKey.generate() - authorization_request = AuthorizationRequest( - "https://as.local/authorize", - client_id="foo", - redirect_uri="http://localhost/local", - scope="openid", - dpop_key=dpop_key, - ) - - serializer = AuthorizationRequestSerializer() - - serialized = serializer.dumps(authorization_request) - deserialized_request = serializer.loads(serialized) - - assert isinstance(deserialized_request.dpop_key, DPoPKey) - assert deserialized_request.dpop_key.private_key == dpop_key.private_key - - def test_request_acr_values() -> None: # you may provide acr_values as a space separated list or as a real list azr_str = AuthorizationRequest( diff --git a/tests/unit_tests/test_dpop.py b/tests/unit_tests/test_dpop.py index be99543d..f78cd285 100644 --- a/tests/unit_tests/test_dpop.py +++ b/tests/unit_tests/test_dpop.py @@ -173,7 +173,7 @@ def test_dpop_errors() -> None: with pytest.raises(InvalidDPoPAccessToken, match="invalid characters"): DPoPToken(access_token="some_invalid_characters_follow: ?%", _dpop_key=DPoPKey.generate(alg="ES256")) - with pytest.raises(InvalidDPoPAlg, match="DPoP proofing require an asymmetric signing alg."): + with pytest.raises(InvalidDPoPAlg, match="DPoP proofing require an asymmetric signing alg"): DPoPToken(access_token="access_token", _dpop_key=DPoPKey.generate(alg="HS256")) with pytest.raises(InvalidDPoPKey, match="not an asymmetric private key"): diff --git a/tests/unit_tests/test_serializers.py b/tests/unit_tests/test_serializers.py new file mode 100644 index 00000000..e246ef92 --- /dev/null +++ b/tests/unit_tests/test_serializers.py @@ -0,0 +1,108 @@ +from datetime import datetime, timedelta, timezone + +import pytest +from freezegun.api import FrozenDateTimeFactory + +from requests_oauth2client import ( + AuthorizationRequest, + AuthorizationRequestSerializer, + BearerToken, + DPoPKey, + DPoPToken, + RequestParameterAuthorizationRequest, + RequestUriParameterAuthorizationRequest, + TokenSerializer, +) +from requests_oauth2client.exceptions import UnsupportedTokenTypeError + + +@pytest.mark.parametrize( + "token", + [ + BearerToken("access_token"), + # note that "expires_at" is calculated when the test is run, so before `freezer` takes effect + BearerToken("access_token", expires_in=60), + BearerToken("access_token", expires_in=-60), + DPoPToken("access_token", _dpop_key=DPoPKey.generate()), + DPoPToken("access_token", expires_in=60, _dpop_key=DPoPKey.generate()), + DPoPToken("access_token", expires_in=60, _dpop_key=DPoPKey.generate(alg="RS256")), + ], +) +def test_token_serializer(token: BearerToken, freezer: FrozenDateTimeFactory) -> None: + freezer.move_to("2024-08-01") + serializer = TokenSerializer() + candidate = serializer.dumps(token) + freezer.move_to(datetime.now(tz=timezone.utc) + timedelta(days=365)) + assert serializer.loads(candidate) == token + + +def test_authorization_request_serializer( + authorization_request: AuthorizationRequest, + request_parameter_authorization_request: RequestParameterAuthorizationRequest, +) -> None: + serializer = AuthorizationRequestSerializer() + serialized = serializer.dumps(authorization_request) + assert serializer.loads(serialized) == authorization_request + + request_parameter_serialized = serializer.dumps(request_parameter_authorization_request) + assert serializer.loads(request_parameter_serialized) == request_parameter_authorization_request + + +@pytest.fixture( + scope="module", params=["this_is_a_request_uri", "urn:this:is:a:request_uri", "https://foo.bar/request_uri"] +) +def request_uri_authorization_request( + authorization_endpoint: str, client_id: str, request: pytest.FixtureRequest +) -> RequestUriParameterAuthorizationRequest: + request_uri = request.param + return RequestUriParameterAuthorizationRequest( + authorization_endpoint=authorization_endpoint, + client_id=client_id, + request_uri=request_uri, + custom_param="custom_value", + ) + + +def test_request_uri_authorization_request_serializer( + request_uri_authorization_request: RequestUriParameterAuthorizationRequest, +) -> None: + serializer = AuthorizationRequestSerializer() + serialized = serializer.dumps(request_uri_authorization_request) + deserialized = serializer.loads(serialized) + assert isinstance(deserialized, RequestUriParameterAuthorizationRequest) + assert deserialized == request_uri_authorization_request + + +def test_authorization_request_serializer_with_dpop_key() -> None: + dpop_key = DPoPKey.generate() + authorization_request = AuthorizationRequest( + "https://as.local/authorize", + client_id="foo", + redirect_uri="http://localhost/local", + scope="openid", + dpop_key=dpop_key, + ) + + serializer = AuthorizationRequestSerializer() + + serialized = serializer.dumps(authorization_request) + deserialized_request = serializer.loads(serialized) + + assert isinstance(deserialized_request.dpop_key, DPoPKey) + assert deserialized_request.dpop_key.private_key == dpop_key.private_key + + +def test_unsupported_token_type() -> None: + class CustomToken(BearerToken): + TOKEN_TYPE = "CustomToken" + + custom_token = CustomToken(access_token="my_access_token", token_type="CustomToken", custom_key="custom_value") + serializer = TokenSerializer() + serialized = serializer.dumps(custom_token) + assert serializer.loader(serialized) == { + "access_token": "my_access_token", + "token_type": "CustomToken", + "custom_key": "custom_value", + } # all attributes are preserved + with pytest.raises(UnsupportedTokenTypeError): + serializer.loads(serialized) # but deserialization fails due to unsupported token type diff --git a/tests/unit_tests/test_tokens.py b/tests/unit_tests/test_tokens.py index 8c04e357..0e1705ee 100644 --- a/tests/unit_tests/test_tokens.py +++ b/tests/unit_tests/test_tokens.py @@ -1,9 +1,8 @@ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import jwskate import pytest from freezegun import freeze_time -from freezegun.api import FrozenDateTimeFactory from jwskate import ( ExpiredJwt, InvalidClaim, @@ -15,7 +14,7 @@ SignedJwt, ) -from requests_oauth2client import BearerToken, BearerTokenSerializer, IdToken +from requests_oauth2client import BearerToken, IdToken ID_TOKEN = ( "eyJhbGciOiJSUzI1NiIsImtpZCI6Im15X2tleSJ9.eyJhY3IiOiIyIiwiYW1yIjpbInB3ZCIsIm90cCJdLCJhdWQiOiJjbGllbnRfaWQiL" @@ -271,23 +270,6 @@ def test_id_token_attributes() -> None: assert good_id_token.auth_datetime == datetime(2024, 9, 5, 9, 41, 21, tzinfo=timezone.utc) -@pytest.mark.parametrize( - "token", - [ - BearerToken("access_token"), - # note that "expires_at" is calculated when the test is ran, so before `freezer` takes effect - BearerToken("access_token", expires_in=60), - BearerToken("access_token", expires_in=-60), - ], -) -def test_token_serializer(token: BearerToken, freezer: FrozenDateTimeFactory) -> None: - freezer.move_to("2024-08-01") - serializer = BearerTokenSerializer() - candidate = serializer.dumps(token) - freezer.move_to(datetime.now(tz=timezone.utc) + timedelta(days=365)) - assert serializer.loads(candidate) == token - - @freeze_time() def test_expires_in_as_str() -> None: assert BearerToken("access_token", expires_in=60) == BearerToken("access_token", expires_in="60") diff --git a/uv.lock b/uv.lock index 0d65d9e5..824fd6a1 100644 --- a/uv.lock +++ b/uv.lock @@ -699,6 +699,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/36/f4/c6e662dade71f56cd2f3735141b265c3c79293c109549c1e6933b0651ffc/exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10", size = 16674, upload-time = "2025-05-10T17:42:49.33Z" }, ] +[[package]] +name = "fieldz" +version = "0.1.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/78/c2/9d35a747f439051bff41511178b0b527f1466c7fd4b5a5f7357c136e82bc/fieldz-0.1.3.tar.gz", hash = "sha256:bc21013b2bd5f8a4a782ecef440343ff059fc0a4a2033ae4d616311675260585", size = 18018, upload-time = "2025-09-26T20:42:24.67Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/12/a0/5db990f52226aa4e01e268f285fa56ddf233b3b27b993b4ecc751cf2216f/fieldz-0.1.3-py3-none-any.whl", hash = "sha256:f975fc6f250958966e39504f89bf09bad41479b4c5e0b06b36d13fd92fbac792", size = 18227, upload-time = "2025-09-26T20:42:22.894Z" }, +] + [[package]] name = "filelock" version = "3.19.1" @@ -809,6 +821,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9c/83/3b1d03d36f224edded98e9affd0467630fc09d766c0e56fb1498cbb04a9b/griffe-1.15.0-py3-none-any.whl", hash = "sha256:6f6762661949411031f5fcda9593f586e6ce8340f0ba88921a0f2ef7a81eb9a3", size = 150705, upload-time = "2025-11-10T15:03:13.549Z" }, ] +[[package]] +name = "griffe-fieldz" +version = "0.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "fieldz" }, + { name = "griffe", version = "1.14.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "griffe", version = "1.15.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c5/6a/94754bf39fd63ba424c667b2abf0ade78e3878e223591d1fb9c3e8a77bce/griffe_fieldz-0.3.0.tar.gz", hash = "sha256:42e7707dac51d38e26fb7f3f7f51429da9b47e98060bfeb81a4287456d5b8a89", size = 10149, upload-time = "2025-07-30T21:43:10.042Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4d/33/cc527c11132a6274724a04938d50e1ff2b54a5f5943cd0480427571e1adb/griffe_fieldz-0.3.0-py3-none-any.whl", hash = "sha256:52e02fdcbdf6dea3c8c95756d1e0b30861569f871d19437fda702776fde4e64d", size = 6577, upload-time = "2025-07-30T21:43:09.073Z" }, +] + [[package]] name = "identify" version = "2.6.15" @@ -1723,6 +1749,7 @@ dev = [ { name = "virtualenv" }, ] doc = [ + { name = "griffe-fieldz" }, { name = "mkdocs" }, { name = "mkdocs-autorefs" }, { name = "mkdocs-include-markdown-plugin" }, @@ -1766,6 +1793,7 @@ dev = [ { name = "virtualenv", specifier = ">=20.30.0" }, ] doc = [ + { name = "griffe-fieldz", specifier = ">=0.3.0" }, { name = "mkdocs", specifier = ">=1.3.1" }, { name = "mkdocs-autorefs", specifier = ">=0.3.0" }, { name = "mkdocs-include-markdown-plugin", specifier = ">=6" },