|
6 | 6 | from oauthlib.oauth2 import WebApplicationClient, InsecureTransportError
|
7 | 7 | from oauthlib.oauth2 import LegacyApplicationClient
|
8 | 8 | from oauthlib.oauth2 import TokenExpiredError, is_secure_transport
|
| 9 | +from oauthlib.oauth2 import UnsupportedTokenTypeError |
| 10 | +from oauthlib.oauth2 import TemporarilyUnavailableError, ServerError |
9 | 11 | import requests
|
10 | 12 |
|
11 | 13 | log = logging.getLogger(__name__)
|
@@ -94,8 +96,10 @@ def __init__(
|
94 | 96 | self.compliance_hook = {
|
95 | 97 | "access_token_response": set(),
|
96 | 98 | "refresh_token_response": set(),
|
| 99 | + "revoke_token_response": set(), |
97 | 100 | "protected_request": set(),
|
98 | 101 | "refresh_token_request": set(),
|
| 102 | + "revoke_token_request": set(), |
99 | 103 | "access_token_request": set(),
|
100 | 104 | }
|
101 | 105 |
|
@@ -481,6 +485,85 @@ def refresh_token(
|
481 | 485 | self.token["refresh_token"] = refresh_token
|
482 | 486 | return self.token
|
483 | 487 |
|
| 488 | + def revoke_token( |
| 489 | + self, |
| 490 | + token_url, |
| 491 | + token=None, |
| 492 | + token_type=None, |
| 493 | + body="", |
| 494 | + auth=None, |
| 495 | + timeout=None, |
| 496 | + headers=None, |
| 497 | + verify=None, |
| 498 | + proxies=None, |
| 499 | + **kwargs |
| 500 | + ): |
| 501 | + """Revoke a token pair using a token. |
| 502 | +
|
| 503 | + :param token_url: The token endpoint, must be HTTPS. |
| 504 | + :param token: The token to revoke. |
| 505 | + :param token_type: The type of token to revoke. |
| 506 | + :param body: Optional application/x-www-form-urlencoded body to add the |
| 507 | + include in the token request. Prefer kwargs over body. |
| 508 | + :param auth: An auth tuple or method as accepted by `requests`. |
| 509 | + :param timeout: Timeout of the request in seconds. |
| 510 | + :param headers: A dict of headers to be used by `requests`. |
| 511 | + :param verify: Verify SSL certificate. |
| 512 | + :param proxies: The `proxies` argument will be passed to `requests`. |
| 513 | + :param kwargs: Extra parameters to include in the token request. |
| 514 | + :return: A token dict |
| 515 | + """ |
| 516 | + if not token_url: |
| 517 | + raise ValueError("No token endpoint set for revoke.") |
| 518 | + |
| 519 | + if not is_secure_transport(token_url): |
| 520 | + raise InsecureTransportError() |
| 521 | + |
| 522 | + token = token or self.token.get("token") |
| 523 | + token_type = token_type or self.token.get("token_type") |
| 524 | + |
| 525 | + _request_headers = headers or {} |
| 526 | + |
| 527 | + if token_type: |
| 528 | + (url, _headers, body) = self._client.prepare_token_revocation_request( |
| 529 | + token_url, token, token_type, body=body, scope=self.scope, **kwargs) |
| 530 | + else: |
| 531 | + (url, _headers, body) = self._client.prepare_revocation_request( |
| 532 | + token_url, token, body=body, scope=self.scope, **kwargs) |
| 533 | + _request_headers.update(_headers) |
| 534 | + log.debug("Prepared revocation request %s", body) |
| 535 | + |
| 536 | + for hook in self.compliance_hook["revoke_token_request"]: |
| 537 | + log.debug("Invoking revoke_token_request hook %s.", hook) |
| 538 | + url, _request_headers, body = hook(url, _headers, body) |
| 539 | + |
| 540 | + r = self.post( |
| 541 | + url, |
| 542 | + data=dict(urldecode(body)), |
| 543 | + auth=auth, |
| 544 | + timeout=timeout, |
| 545 | + headers=_request_headers, |
| 546 | + verify=verify, |
| 547 | + withhold_token=True, |
| 548 | + proxies=proxies, |
| 549 | + ) |
| 550 | + log.debug("Request to revoke token completed with status %s.", r.status_code) |
| 551 | + log.debug("Response headers were %s and content %s.", r.headers, r.text) |
| 552 | + log.debug( |
| 553 | + "Invoking %d token response hooks.", |
| 554 | + len(self.compliance_hook["revoke_token_response"]), |
| 555 | + ) |
| 556 | + for hook in self.compliance_hook["revoke_token_response"]: |
| 557 | + log.debug("Invoking hook %s.", hook) |
| 558 | + r = hook(r) |
| 559 | + |
| 560 | + if not r.ok and r.status_code == 400: |
| 561 | + if 'unsupported_token_type' in r.text: |
| 562 | + raise UnsupportedTokenTypeError("Revocation not supported by server") |
| 563 | + raise ServerError('Server error') |
| 564 | + elif r.code == 503: |
| 565 | + raise TemporarilyUnavailableError("Service unavailable") |
| 566 | + |
484 | 567 | def request(
|
485 | 568 | self,
|
486 | 569 | method,
|
@@ -555,9 +638,11 @@ def register_compliance_hook(self, hook_type, hook):
|
555 | 638 | Available hooks are:
|
556 | 639 | access_token_response invoked before token parsing.
|
557 | 640 | refresh_token_response invoked before refresh token parsing.
|
| 641 | + revoke_token_response invoked after token revocation. |
558 | 642 | protected_request invoked before making a request.
|
559 | 643 | access_token_request invoked before making a token fetch request.
|
560 | 644 | refresh_token_request invoked before making a refresh request.
|
| 645 | + revoke_token_request invoked before making a revoke request. |
561 | 646 |
|
562 | 647 | If you find a new hook is needed please send a GitHub PR request
|
563 | 648 | or open an issue.
|
|
0 commit comments