@@ -63,12 +63,15 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
6363 host , device_telemetry_rate_limit , device_telemetry_dp_rate_limit )
6464 self .__device_messages_rate_limit = RateLimit .get_rate_limit_by_host (host , device_messages_rate_limit )
6565
66+ self ._devices_connected_through_gateway_telemetry_messages_rate_limit = RateLimit (self .__device_telemetry_rate_limit , "Rate limit for devices connected through gateway telemetry messages" )
67+ self ._devices_connected_through_gateway_telemetry_datapoints_rate_limit = RateLimit (self .__device_telemetry_dp_rate_limit , "Rate limit for devices connected through gateway telemetry data points" )
68+ self ._devices_connected_through_gateway_messages_rate_limit = RateLimit (self .__device_messages_rate_limit , "Rate limit for devices connected through gateway messages" )
69+
6670 self .service_configuration_callback = self .__on_service_configuration
6771 self .quality_of_service = quality_of_service
6872 self .__max_sub_id = 0
6973 self .__sub_dict = {}
7074 self .__connected_devices = set ("*" )
71- self ._devices_rate_limit = {}
7275 self .devices_server_side_rpc_request_handler = None
7376 self ._client .on_connect = self ._on_connect
7477 self ._client .on_message = self ._on_message
@@ -125,6 +128,7 @@ def _on_decoded_message(self, content, message, **kwargs):
125128 if message .topic .startswith (GATEWAY_ATTRIBUTES_RESPONSE_TOPIC ):
126129 with self ._lock :
127130 req_id = content ["id" ]
131+ self ._devices_connected_through_gateway_messages_rate_limit .increase_rate_limit_counter (1 )
128132 # pop callback and use it
129133 if self ._attr_request_dict [req_id ]:
130134 callback = self ._attr_request_dict .pop (req_id )
@@ -143,6 +147,7 @@ def _on_decoded_message(self, content, message, **kwargs):
143147 # callbacks for device. in this case callback executes for all attributes in message
144148 if content .get ("device" ) is None :
145149 return
150+ self ._devices_connected_through_gateway_messages_rate_limit .increase_rate_limit_counter (1 )
146151 target = content ["device" ] + "|*"
147152 if self .__sub_dict .get (target ):
148153 for device in self .__sub_dict [target ]:
@@ -154,6 +159,7 @@ def _on_decoded_message(self, content, message, **kwargs):
154159 for device in self .__sub_dict [target ]:
155160 self .__sub_dict [target ][device ](content )
156161 elif message .topic == GATEWAY_RPC_TOPIC :
162+ self ._devices_connected_through_gateway_messages_rate_limit .increase_rate_limit_counter (1 )
157163 if self .devices_server_side_rpc_request_handler :
158164 self .devices_server_side_rpc_request_handler (self , content )
159165
@@ -162,26 +168,23 @@ def __request_attributes(self, device, keys, callback, type_is_client=False):
162168 log .error ("There are no keys to request" )
163169 return False
164170
165- ts_in_millis = int (round (time () * 1000 ))
166171 attr_request_number = self ._add_attr_request_callback (callback )
167172 msg = {"keys" : keys ,
168173 "device" : device ,
169174 "client" : type_is_client ,
170175 "id" : attr_request_number }
171176 info = self ._send_device_request (TBSendMethod .PUBLISH , device , topic = GATEWAY_ATTRIBUTES_REQUEST_TOPIC , data = msg ,
172177 qos = 1 )
173- self ._add_timeout ( attr_request_number , ts_in_millis + 30000 )
178+ self .__attrs_request_timeout [ attr_request_number ] = int ( time ()) + 20
174179 return info
175180
176181 def _send_device_request (self , _type , device_name , ** kwargs ):
177182 if _type == TBSendMethod .PUBLISH :
178- if self ._devices_rate_limit .get (device_name ) is None :
179- self ._add_device_rate_limit (device_name )
180- device_msg_rate_limit = self ._devices_rate_limit [device_name ]['msg_rate_limit' ]
181- device_dp_rate_limit = self ._devices_rate_limit [device_name ]['dp_rate_limit' ]
183+ device_msg_rate_limit = self ._devices_connected_through_gateway_messages_rate_limit
184+ device_dp_rate_limit = self .EMPTY_RATE_LIMIT
182185 if kwargs .get ('topic' ) == GATEWAY_TELEMETRY_TOPIC :
183- device_msg_rate_limit = self ._devices_rate_limit [ device_name ][ 'telemetry_msg_rate_limit' ]
184- device_dp_rate_limit = self ._devices_rate_limit [ device_name ][ 'telemetry_dp_rate_limit' ]
186+ device_msg_rate_limit = self ._devices_connected_through_gateway_telemetry_messages_rate_limit
187+ device_dp_rate_limit = self ._devices_connected_through_gateway_telemetry_datapoints_rate_limit
185188 info = self ._publish_data (** kwargs , device = device_name ,
186189 msg_rate_limit = device_msg_rate_limit ,
187190 dp_rate_limit = device_dp_rate_limit )
@@ -242,16 +245,6 @@ def gw_subscribe_to_attribute(self, device, attribute, callback):
242245 return False
243246
244247 with self ._lock :
245- # if device == '*':
246- # for device_name in self._devices_rate_limit.keys():
247- # is_reached = self.check_device_rate_limit(device_name)
248- # if is_reached:
249- # return is_reached
250- # else:
251- # is_reached = self.check_device_rate_limit(device)
252- # if is_reached:
253- # return is_reached
254-
255248 self .__max_sub_id += 1
256249 key = device + "|" + attribute
257250 if key not in self .__sub_dict :
@@ -299,22 +292,6 @@ def gw_claim(self, device_name, secret_key, duration, claiming_request=None):
299292 data = claiming_request , qos = self .quality_of_service )
300293 return info
301294
302- def _add_device_rate_limit (self , device_name ):
303- telemetry_rate_limit = RateLimit (self .__device_telemetry_rate_limit , "Rate limit for device %s telemetry messages" % device_name )
304- telemetry_dp_rate_limit = RateLimit (self .__device_telemetry_dp_rate_limit , "Rate limit for device %s telemetry data points" % device_name )
305- msg_dp_rate_limit = self .EMPTY_RATE_LIMIT
306- msg_rate_limit = RateLimit (self .__device_messages_rate_limit , "Rate limit for device %s messages" % device_name )
307- self ._devices_rate_limit [device_name ] = {
308- 'msg_rate_limit' : msg_rate_limit ,
309- 'dp_rate_limit' : msg_dp_rate_limit ,
310- 'telemetry_msg_rate_limit' : telemetry_rate_limit ,
311- 'telemetry_dp_rate_limit' : telemetry_dp_rate_limit
312- }
313-
314- def _change_devices_rate_limit (self , rate_limit_key , rate_limit_value , percentage = 50 ):
315- for device in self ._devices_rate_limit .values ():
316- device [rate_limit_key ].set_limit (rate_limit_value , percentage = percentage )
317-
318295 def __on_service_configuration (self , _ , response , * args , ** kwargs ):
319296 if "error" in response :
320297 log .warning ("Timeout while waiting for service configuration!, session will use default configuration." )
@@ -327,23 +304,9 @@ def __on_service_configuration(self, _, response, *args, **kwargs):
327304 super ().on_service_configuration (_ , {'rateLimit' : gateway_device_itself_rate_limit_config , ** service_config }, * args , ** kwargs )
328305
329306 if gateway_devices_rate_limit_config .get ("messages" ):
330- # change global rate limit for future devices
331- self .__device_messages_rate_limit = gateway_devices_rate_limit_config .get ('messages' )
332-
333- # change rate limit for already connected devices
334- self ._change_devices_rate_limit ('msg_rate_limit' , gateway_devices_rate_limit_config .get ('messages' ))
307+ self ._devices_connected_through_gateway_messages_rate_limit .set_limit (gateway_devices_rate_limit_config .get ("messages" ))
335308 if gateway_devices_rate_limit_config .get ('telemetryMessages' ):
336- # change global rate limit for future devices
337- self .__device_telemetry_rate_limit = gateway_devices_rate_limit_config .get ('telemetryMessages' )
338-
339- # change rate limit for already connected devices
340- self ._change_devices_rate_limit ('telemetry_msg_rate_limit' ,
341- gateway_devices_rate_limit_config .get ('telemetryMessages' ))
309+ self ._devices_connected_through_gateway_telemetry_messages_rate_limit .set_limit (gateway_devices_rate_limit_config .get ('telemetryMessages' ))
342310 if gateway_devices_rate_limit_config .get ('telemetryDataPoints' ):
343- # change global rate limit for future devices
344- self .__device_telemetry_dp_rate_limit = gateway_devices_rate_limit_config .get ('telemetryDataPoints' )
345-
346- # change rate limit for already connected devices
347- self ._change_devices_rate_limit ('telemetry_dp_rate_limit' ,
348- gateway_devices_rate_limit_config .get ('telemetryDataPoints' ))
311+ self ._devices_connected_through_gateway_telemetry_datapoints_rate_limit .set_limit (gateway_devices_rate_limit_config .get ('telemetryDataPoints' ))
349312 self .rate_limits_received = True
0 commit comments