diff --git a/votify/spotify_api.py b/votify/spotify_api.py index cb637eb..6098f99 100644 --- a/votify/spotify_api.py +++ b/votify/spotify_api.py @@ -43,6 +43,7 @@ class SpotifyApi: EXTEND_TRACK_COLLECTION_WAIT_TIME = 0.5 SERVER_TIME_URL = "https://open.spotify.com/api/server-time" SESSION_TOKEN_URL = "https://open.spotify.com/api/token" + CLIENT_TOKEN_URL = "https://clienttoken.spotify.com/v1/clienttoken" def __init__( self, @@ -108,10 +109,12 @@ def _get_server_time(self) -> int: check_response(response) return 1e3 * response.json()["serverTime"] - def set_authorization_header(self, token: str) -> None: + def set_token_headers(self, token: str, client_token: str | None = None) -> None: + self.session.headers.update( { "authorization": f"Bearer {token}", + "client-token": client_token } ) @@ -132,10 +135,28 @@ def _setup_authorization_with_totp(self) -> None: authorization_info = response.json() if not authorization_info.get("accessToken"): raise ValueError("Failed to retrieve access token.") - self.set_authorization_header(authorization_info["accessToken"]) + + response = self.session.post(self.CLIENT_TOKEN_URL, + json = { + 'client_data': { + 'client_version': self.CLIENT_VERSION, + 'client_id': authorization_info['clientId'], + 'js_sdk_data': {} + } + }, + headers = { + 'Accept': 'application/json', + } + ) + check_response(response) + client_token = response.json() + if not client_token.get("granted_token"): + raise ValueError("Failed to retrieve granted token.") + + self.set_token_headers(authorization_info["accessToken"], client_token["granted_token"]["token"]) self.session_auth_expire_time = ( authorization_info["accessTokenExpirationTimestampMs"] / 1000 - ) + ) def _setup_authorization(self) -> None: if self.use_device_flow: @@ -146,14 +167,14 @@ def _setup_authorization(self) -> None: def _setup_authorization_with_device_flow(self) -> None: device_flow = SpotifyDeviceFlow(self.sp_dc) token_data = device_flow.get_token() - self.set_authorization_header(token_data["access_token"]) + self.set_token_headers(token_data["access_token"]) self.session_auth_expire_time = ( int(time.time()) + token_data["expires_in"] ) * 1000 def _refresh_session_auth(self) -> None: timestamp_session_expire = int(self.session_auth_expire_time) - timestamp_now = time.time() * 1000 + timestamp_now = time.time() if timestamp_now < timestamp_session_expire: return self._setup_authorization()