|
12 | 12 |
|
13 | 13 | class TokenCacheTestCase(unittest.TestCase): |
14 | 14 |
|
| 15 | + @staticmethod |
| 16 | + def build_id_token(sub="sub", oid="oid", preferred_username="me", **kwargs): |
| 17 | + return "header.%s.signature" % base64.b64encode(json.dumps(dict({ |
| 18 | + "sub": sub, |
| 19 | + "oid": oid, |
| 20 | + "preferred_username": preferred_username, |
| 21 | + }, **kwargs)).encode()).decode('utf-8') |
| 22 | + |
| 23 | + @staticmethod |
| 24 | + def build_response( # simulate a response from AAD |
| 25 | + uid="uid", utid="utid", # They will form client_info |
| 26 | + access_token=None, expires_in=3600, token_type="some type", |
| 27 | + refresh_token=None, |
| 28 | + foci=None, |
| 29 | + id_token=None, # or something generated by build_id_token() |
| 30 | + error=None, |
| 31 | + ): |
| 32 | + response = { |
| 33 | + "client_info": base64.b64encode(json.dumps({ |
| 34 | + "uid": uid, "utid": utid, |
| 35 | + }).encode()).decode('utf-8'), |
| 36 | + } |
| 37 | + if error: |
| 38 | + response["error"] = error |
| 39 | + if access_token: |
| 40 | + response.update({ |
| 41 | + "access_token": access_token, |
| 42 | + "expires_in": expires_in, |
| 43 | + "token_type": token_type, |
| 44 | + }) |
| 45 | + if refresh_token: |
| 46 | + response["refresh_token"] = refresh_token |
| 47 | + if id_token: |
| 48 | + response["id_token"] = id_token |
| 49 | + if foci: |
| 50 | + response["foci"] = foci |
| 51 | + return response |
| 52 | + |
15 | 53 | def setUp(self): |
16 | 54 | self.cache = TokenCache() |
17 | 55 |
|
18 | 56 | def testAdd(self): |
19 | | - client_info = base64.b64encode(b''' |
20 | | - {"uid": "uid", "utid": "utid"} |
21 | | - ''').decode('utf-8') |
22 | | - id_token = "header.%s.signature" % base64.b64encode(b'''{ |
23 | | - "sub": "subject", |
24 | | - "oid": "object1234", |
25 | | - "preferred_username": "John Doe" |
26 | | - }''').decode('utf-8') |
| 57 | + id_token = self.build_id_token(oid="object1234", preferred_username="John Doe") |
27 | 58 | self.cache.add({ |
28 | 59 | "client_id": "my_client_id", |
29 | 60 | "scope": ["s2", "s1", "s3"], # Not in particular order |
30 | 61 | "token_endpoint": "https://login.example.com/contoso/v2/token", |
31 | | - "response": { |
32 | | - "access_token": "an access token", |
33 | | - "token_type": "some type", |
34 | | - "expires_in": 3600, |
35 | | - "refresh_token": "a refresh token", |
36 | | - "client_info": client_info, |
37 | | - "id_token": id_token, |
38 | | - }, |
| 62 | + "response": self.build_response( |
| 63 | + uid="uid", utid="utid", # client_info |
| 64 | + expires_in=3600, access_token="an access token", |
| 65 | + id_token=id_token, refresh_token="a refresh token"), |
39 | 66 | }, now=1000) |
40 | 67 | self.assertEqual( |
41 | 68 | { |
|
0 commit comments