@@ -1113,59 +1113,6 @@ async def _frequent_checks(self):
11131113 pass
11141114 logger .debug (f"BybitWebSocketApiManager._frequent_checks() - Leaving thread ..." )
11151115
1116- async def _ping_listen_key (self , stream_id = None ):
1117- logger .info (f"BybitWebSocketApiManager._ping_listen_key(stream_id={ stream_id } ) - asyncio task running!" )
1118- if isinstance (self .stream_list [stream_id ]['markets' ], str ):
1119- with self .stream_list_lock :
1120- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - `stream_list_lock` was entered!" )
1121- self .stream_list [stream_id ]['markets' ] = [self .stream_list [stream_id ]['markets' ], ]
1122- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - Leaving `stream_list_lock`!" )
1123- if isinstance (self .stream_list [stream_id ]['channels' ], str ):
1124- with self .stream_list_lock :
1125- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - `stream_list_lock` was entered!" )
1126- self .stream_list [stream_id ]['channels' ] = [self .stream_list [stream_id ]['channels' ], ]
1127- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - Leaving `stream_list_lock`!" )
1128- while self .stream_list [stream_id ]['status' ] != "stopped" \
1129- and not self .stream_list [stream_id ]['status' ].startswith ("crashed" ):
1130- await asyncio .sleep (2 )
1131- if self .stream_list [stream_id ]['keep_listen_key_alive' ] is True \
1132- and (self .stream_list [stream_id ]['start_time' ] +
1133- self .stream_list [stream_id ]['listen_key_cache_time' ]) < time .time () \
1134- and (self .stream_list [stream_id ]['last_static_ping_listen_key' ] +
1135- self .stream_list [stream_id ]['listen_key_cache_time' ]) < time .time ():
1136- try :
1137- response , bybit_api_status = self .restclient .keepalive_listen_key (stream_id )
1138- if bybit_api_status is not None :
1139- self .bybit_api_status = bybit_api_status
1140- with self .stream_list_lock :
1141- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - `stream_list_lock` was entered!" )
1142- self .stream_list [stream_id ]['last_static_ping_listen_key' ] = time .time ()
1143- logger .debug (f"BybitWebSocketApiManager._ping_listen_key() - Leaving `stream_list_lock`!" )
1144- self .set_heartbeat (stream_id )
1145- logger .info (f"BybitWebSocketApiManager._ping_listen_key(stream_id={ stream_id } ) - pinged "
1146- f"listen_key!" )
1147- sleep_till = time .time () + self .listen_key_refresh_interval
1148- while sleep_till > time .time () \
1149- and self .stream_list [stream_id ]['status' ] != "stopped" \
1150- and not self .stream_list [stream_id ]['status' ].startswith ("crashed" ):
1151- await asyncio .sleep (2 )
1152- except Exception as error_msg :
1153- logger .critical (f"BybitWebSocketApiManager._ping_listen_key(stream_id={ stream_id } ) - "
1154- f"BybitAPIException - Not able to ping the listen_key - error: { error_msg } " )
1155- if "IP banned" in str (error_msg ):
1156- match = re .search (r"until (\d+)" , str (error_msg ))
1157- if match :
1158- banned_timeframe = (int (match .group (1 ))/ 1000 ) - time .time ()
1159- logger .critical (f"BybitWebSocketApiManager._ping_listen_key(stream_id="
1160- f"{ stream_id } ) - Wait for { banned_timeframe } seconds until the "
1161- f"IP ban has expired." )
1162- while banned_timeframe > 0 \
1163- and self .stream_list [stream_id ]['status' ] != "stopped" \
1164- and not self .stream_list [stream_id ]['status' ].startswith ("crashed" ):
1165- await asyncio .sleep (2 )
1166- banned_timeframe = (int (match .group (1 )) / 1000 ) - time .time ()
1167- logger .info (f"BybitWebSocketApiManager._ping_listen_key(stream_id={ stream_id } ) - asyncio task stopped!" )
1168-
11691116 @staticmethod
11701117 def _handle_task_result (task : asyncio .Task ) -> None :
11711118 """
@@ -2136,65 +2083,6 @@ def get_number_of_all_subscriptions(self):
21362083 return self .all_subscriptions_number
21372084 return subscriptions
21382085
2139- def get_number_of_free_subscription_slots (self , stream_id ):
2140- """
2141- Get the number of free subscription slots (max allowed subscriptions - subscriptions) of a specific stream
2142-
2143- :return: int
2144- """
2145- try :
2146- free_slots = self .max_subscriptions_per_stream - self .stream_list [stream_id ]['subscriptions' ]
2147- except KeyError as error_msg :
2148- logger .debug (f"BybitWebSocketApiManager.get_number_of_free_subscription_slots() - KeyError: { error_msg } " )
2149- return None
2150- return free_slots
2151-
2152- def get_listen_key_from_restclient (self , stream_id ):
2153- """
2154- Get a new or cached (<30m) listen_key
2155-
2156- :param stream_id: provide a stream_id
2157- :type stream_id: str
2158- """
2159- try :
2160- if (self .stream_list [stream_id ]['start_time' ] + self .stream_list [stream_id ]['listen_key_cache_time' ]) > \
2161- time .time () or (self .stream_list [stream_id ]['last_static_ping_listen_key' ] +
2162- self .stream_list [stream_id ]['listen_key_cache_time' ]) > time .time ():
2163- # listen_key is not older than 30 min
2164- if self .stream_list [stream_id ]['listen_key' ] is not None :
2165- response = {'listenKey' : self .stream_list [stream_id ]['listen_key' ]}
2166- return response
2167- except KeyError as error_msg :
2168- logger .debug (f"BybitWebSocketApiManager.get_listen_key_from_restclient() - KeyError: { error_msg } " )
2169- return False
2170- # no cached listen_key or listen_key is older than 30 min
2171- # acquire a new listen_key:
2172- try :
2173- response , bybit_api_status = self .restclient .get_listen_key (stream_id )
2174- if bybit_api_status is not None :
2175- self .bybit_api_status = bybit_api_status
2176- except ResourceWarning as error_msg :
2177- logger .error (f"BybitWebSocketApiManager.get_listen_key_from_restclient() - ResourceWarning: { error_msg } " )
2178- return False
2179- if response :
2180- # save and return the valid listen_key
2181- try :
2182- with self .stream_list_lock :
2183- logger .debug (f"BybitWebSocketApiManager.get_listen_key_from_restclient() - `stream_list_lock` "
2184- f"was entered!" )
2185- self .stream_list [stream_id ]['listen_key' ] = str (response ['listenKey' ])
2186- logger .debug (f"BybitWebSocketApiManager.get_listen_key_from_restclient() - Leaving "
2187- f"`stream_list_lock`!" )
2188- return response
2189- except KeyError :
2190- # no valid listen_key, but a response from endpoint
2191- return response
2192- except TypeError :
2193- return response
2194- else :
2195- # no valid listen_key
2196- return False
2197-
21982086 def get_most_receives_per_second (self ):
21992087 """
22002088 Get the highest total receives per second value
0 commit comments