99import logging
1010
1111from typing import List
12+ from datetime import datetime , timedelta , timezone
13+ from itertools import count
1214
1315
1416"""
@@ -64,6 +66,18 @@ def generate_google_response_dict(self, default_scopes: List[str]=["https://www.
6466 "expires_in" : 3600
6567 }
6668
69+ _AZURE_FEDERATED_TOKEN_COUNTER = count (1 )
70+ _GCP_STS_TOKEN_COUNTER = count (1 )
71+ _GCP_IMPERSONATED_TOKEN_COUNTER = count (1 )
72+
73+
74+ def _utc_now () -> datetime :
75+ return datetime .now (timezone .utc )
76+
77+
78+ def _isoformat_z (ts : datetime ) -> str :
79+ return ts .astimezone (timezone .utc ).replace (microsecond = 0 ).isoformat ().replace ('+00:00' , 'Z' )
80+
6781def _form_to_str (form : ImmutableMultiDict ) -> str :
6882 return json .dumps (form .to_dict (flat = False ), sort_keys = True )
6983
@@ -84,3 +98,66 @@ def contrived_simple_token():
8498 request_data = request .form
8599 app .logger .info (f'POST /contrived/simple/token request data: { _form_to_str (request_data )} ' )
86100 return json .dumps (_SIMPLE_RESPONSE , sort_keys = True )
101+
102+
103+ @app .route ("/aws/sts" , methods = ['POST' ])
104+ def aws_sts_assume_role_with_web_identity ():
105+ request_data = request .form
106+ action = request_data .get ('Action' )
107+ token = request_data .get ('WebIdentityToken' )
108+ if action != 'AssumeRoleWithWebIdentity' or not token :
109+ return json .dumps ({"msg" : "malformed aws_web_identity request" }, sort_keys = True ), 400
110+ expiration = _isoformat_z (_utc_now () + timedelta (seconds = 1 ))
111+ response_xml = f"""<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>
112+ <AssumeRoleWithWebIdentityResponse xmlns=\" https://sts.amazonaws.com/doc/2011-06-15/\" >
113+ <AssumeRoleWithWebIdentityResult>
114+ <Credentials>
115+ <AccessKeyId>ASIA_MOCK_IDFED</AccessKeyId>
116+ <SecretAccessKey>mock-idfed-secret</SecretAccessKey>
117+ <SessionToken>mock-idfed-session-token</SessionToken>
118+ <Expiration>{ expiration } </Expiration>
119+ </Credentials>
120+ </AssumeRoleWithWebIdentityResult>
121+ </AssumeRoleWithWebIdentityResponse>"""
122+ return response_xml , 200 , {"Content-Type" : "text/xml" }
123+
124+
125+ @app .route ("/gcp/sts/token" , methods = ['POST' ])
126+ def gcp_workload_identity_sts_token ():
127+ request_data = request .form
128+ subject_token = request_data .get ('subject_token' )
129+ if not subject_token :
130+ return json .dumps ({"msg" : "missing subject_token" }, sort_keys = True ), 400
131+ token_index = next (_GCP_STS_TOKEN_COUNTER )
132+ response = {
133+ "access_token" : f"gcp-fed-token-{ token_index } " ,
134+ "issued_token_type" : "urn:ietf:params:oauth:token-type:access_token" ,
135+ "token_type" : "Bearer" ,
136+ "expires_in" : 1 ,
137+ }
138+ return json .dumps (response , sort_keys = True )
139+
140+
141+ @app .route ("/gcp/iamcredentials/generateAccessToken" , methods = ['POST' ])
142+ def gcp_workload_identity_impersonation ():
143+ token_index = next (_GCP_IMPERSONATED_TOKEN_COUNTER )
144+ response = {
145+ "accessToken" : f"gcp-impersonated-token-{ token_index } " ,
146+ "expireTime" : _isoformat_z (_utc_now () + timedelta (seconds = 1 )),
147+ }
148+ return json .dumps (response , sort_keys = True )
149+
150+
151+ @app .route ("/azure/federated/token" , methods = ['POST' ])
152+ def azure_federated_token ():
153+ request_data = request .form
154+ subject_token = request_data .get ('subject_token' )
155+ if not subject_token :
156+ return json .dumps ({"msg" : "missing subject_token" }, sort_keys = True ), 400
157+ token_index = next (_AZURE_FEDERATED_TOKEN_COUNTER )
158+ response = {
159+ "access_token" : f"azure-fed-token-{ token_index } " ,
160+ "token_type" : "Bearer" ,
161+ "expires_in" : 1 ,
162+ }
163+ return json .dumps (response , sort_keys = True )
0 commit comments