@@ -148,26 +148,35 @@ def get(self):
148148
149149
150150class RateLimit :
151- def __init__ (self , rate_limit , percentage = 80 ):
152- self .__start_time = monotonic ()
151+ def __init__ (self , rate_limit , name = None , percentage = 80 ):
153152 self .__no_limit = False
154- if '' .join (c for c in rate_limit if c not in [' ' , ',' , ';' ]) in ("" , "0:0" ):
155- self .__no_limit = True
156153 self .__rate_limit_dict = {}
157154 self .__lock = RLock ()
158- rate_configs = rate_limit .split (";" )
159- if "," in rate_limit :
160- rate_configs = rate_limit .split ("," )
161- for rate in rate_configs :
162- if rate == "" :
163- continue
164- rate = rate .split (":" )
165- self .__rate_limit_dict [int (rate [1 ])] = {"counter" : 0 ,
166- "start" : int (monotonic ()),
167- "limit" : int (int (rate [0 ]) * percentage / 100 )}
168- log .debug ("Rate limit set to values: " )
169155 self .__minimal_timeout = DEFAULT_TIMEOUT
170156 self .__minimal_limit = 1000000000
157+ from_dict = isinstance (rate_limit , dict )
158+ if from_dict :
159+ self .__rate_limit_dict = rate_limit .get ('rateLimit' , rate_limit )
160+ name = rate_limit .get ('name' , name )
161+ percentage = rate_limit .get ('percentage' , percentage )
162+ self .no_limit = rate_limit .get ('no_limit' , False )
163+ self .name = name
164+ self .percentage = percentage
165+ self .__start_time = int (monotonic ())
166+ if not from_dict :
167+ if '' .join (c for c in rate_limit if c not in [' ' , ',' , ';' ]) in ("" , "0:0" ):
168+ self .__no_limit = True
169+ rate_configs = rate_limit .split (";" )
170+ if "," in rate_limit :
171+ rate_configs = rate_limit .split ("," )
172+ for rate in rate_configs :
173+ if rate == "" :
174+ continue
175+ rate = rate .split (":" )
176+ self .__rate_limit_dict [int (rate [1 ])] = {"counter" : 0 ,
177+ "start" : int (monotonic ()),
178+ "limit" : int (int (rate [0 ]) * self .percentage / 100 )}
179+ log .debug ("Rate limit set to values: " )
171180 with self .__lock :
172181 if not self .__no_limit :
173182 for rate_limit_time in self .__rate_limit_dict :
@@ -209,10 +218,11 @@ def get_minimal_timeout(self):
209218 def has_limit (self ):
210219 return not self .__no_limit
211220
212- def set_limit (self , rate_limit , percentage = 0 ):
221+ def set_limit (self , rate_limit , percentage = 80 ):
213222 with self .__lock :
214223 old_rate_limit_dict = deepcopy (self .__rate_limit_dict )
215224 self .__rate_limit_dict = {}
225+ self .percentage = percentage if percentage != 0 else self .percentage
216226 rate_configs = rate_limit .split (";" )
217227 if "," in rate_limit :
218228 rate_configs = rate_limit .split ("," )
@@ -222,9 +232,10 @@ def set_limit(self, rate_limit, percentage=0):
222232 rate = rate .split (":" )
223233 rate_limit_time = int (rate [1 ])
224234 limit = int (int (rate [0 ]) * percentage / 100 )
225- self .__rate_limit_dict [int (rate [1 ])] = {"counter" : old_rate_limit_dict .get (rate_limit_time , {}).get ('counter' , 0 ),
226- "start" : self .__rate_limit_dict .get (rate_limit_time , {}).get ('start' , int (monotonic ())),
227- "limit" : limit }
235+ self .__rate_limit_dict [int (rate [1 ])] = {
236+ "counter" : old_rate_limit_dict .get (rate_limit_time , {}).get ('counter' , 0 ),
237+ "start" : self .__rate_limit_dict .get (rate_limit_time , {}).get ('start' , int (monotonic ())),
238+ "limit" : limit }
228239 if rate_limit_time < self .__minimal_limit :
229240 self .__minimal_timeout = rate_limit_time + 1
230241 if limit < self .__minimal_limit :
@@ -235,6 +246,14 @@ def set_limit(self, rate_limit, percentage=0):
235246 for rate_limit_time in self .__rate_limit_dict :
236247 log .debug ("Time: %s, Limit: %s" , rate_limit_time , self .__rate_limit_dict [rate_limit_time ]["limit" ])
237248
249+ def __dict__ (self ):
250+ return {
251+ "rateLimit" : self .__rate_limit_dict ,
252+ "name" : self .name ,
253+ "percentage" : self .percentage ,
254+ "no_limit" : self .__no_limit
255+ }
256+
238257 @staticmethod
239258 def get_rate_limits_by_host (host , rate_limit , dp_rate_limit ):
240259 rate_limit = RateLimit .get_rate_limit_by_host (host , rate_limit )
@@ -271,11 +290,11 @@ def get_rate_limit_by_host(host, rate_limit):
271290 def get_dp_rate_limit_by_host (host , dp_rate_limit ):
272291 if dp_rate_limit == "DEFAULT_TELEMETRY_DP_RATE_LIMIT" :
273292 if "thingsboard.cloud" in host :
274- dp_rate_limit = "5 :1,60 :60,"
293+ dp_rate_limit = "10 :1,300 :60,"
275294 elif "tb" in host and "cloud" in host :
276- dp_rate_limit = "5 :1,60 :60,"
295+ dp_rate_limit = "10 :1,300 :60,"
277296 elif "demo.thingsboard.io" in host :
278- dp_rate_limit = "5 :1,60 :60,"
297+ dp_rate_limit = "10 :1,300 :60,"
279298 else :
280299 dp_rate_limit = "0:0,"
281300 else :
@@ -287,7 +306,7 @@ def get_dp_rate_limit_by_host(host, dp_rate_limit):
287306class TBDeviceMqttClient :
288307 """ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from"""
289308
290- EMPTY_RATE_LIMIT = RateLimit ('0:0,' )
309+ EMPTY_RATE_LIMIT = RateLimit ('0:0,' , "EMPTY_RATE_LIMIT" )
291310
292311 def __init__ (self , host , port = 1883 , username = None , password = None , quality_of_service = None , client_id = "" ,
293312 chunk_size = 0 , messages_rate_limit = "DEFAULT_MESSAGES_RATE_LIMIT" ,
@@ -325,10 +344,10 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
325344 telemetry_dp_rate_limit )
326345 messages_rate_limit = RateLimit .get_rate_limit_by_host (self .__host , messages_rate_limit )
327346
328- self ._messages_rate_limit = RateLimit (messages_rate_limit )
329- self .__telemetry_rate_limit = RateLimit (telemetry_rate_limit )
330- self .__telemetry_dp_rate_limit = RateLimit (telemetry_dp_rate_limit )
331- self ._client .max_inflight_messages_set (self .__telemetry_rate_limit .get_minimal_limit ())
347+ self ._messages_rate_limit = RateLimit (messages_rate_limit , "Rate limit for messages" )
348+ self ._telemetry_rate_limit = RateLimit (telemetry_rate_limit , "Rate limit for telemetry messages" )
349+ self ._telemetry_dp_rate_limit = RateLimit (telemetry_dp_rate_limit , "Rate limit for telemetry data points" )
350+ self ._client .max_inflight_messages_set (self ._telemetry_rate_limit .get_minimal_limit ())
332351 self .__attrs_request_timeout = {}
333352 self .__timeout_thread = Thread (target = self .__timeout_check )
334353 self .__timeout_thread .daemon = True
@@ -651,13 +670,13 @@ def on_service_configuration(self, _, response, *args, **kwargs):
651670 if rate_limits_config .get ('messages' ):
652671 self ._messages_rate_limit .set_limit (rate_limits_config .get ('messages' ), percentage = 80 )
653672 if rate_limits_config .get ('telemetryMessages' ):
654- self .__telemetry_rate_limit .set_limit (rate_limits_config .get ('telemetryMessages' ), percentage = 80 )
673+ self ._telemetry_rate_limit .set_limit (rate_limits_config .get ('telemetryMessages' ), percentage = 80 )
655674 if rate_limits_config .get ('telemetryDataPoints' ):
656- self .__telemetry_dp_rate_limit .set_limit (rate_limits_config .get ('telemetryDataPoints' ), percentage = 80 )
675+ self ._telemetry_dp_rate_limit .set_limit (rate_limits_config .get ('telemetryDataPoints' ), percentage = 80 )
657676 if service_config .get ('maxInflightMessages' ):
658677 max_inflight_messages = int (min (self ._messages_rate_limit .get_minimal_limit (),
659- self .__telemetry_rate_limit .get_minimal_limit (),
660- service_config .get ('maxInflightMessages' , 100 )) * 80 / 100 )
678+ self ._telemetry_rate_limit .get_minimal_limit (),
679+ service_config .get ('maxInflightMessages' , 100 )) * 80 / 100 )
661680 self .max_inflight_messages_set (max_inflight_messages )
662681 if service_config .get ('maxPayloadSize' ):
663682 self .max_payload_size = int (int (service_config .get ('maxPayloadSize' )) * 80 / 100 )
@@ -677,6 +696,7 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
677696 disconnected = False
678697 limit_reached_check = True
679698 log_posted = False
699+ waited = False
680700 while limit_reached_check :
681701 limit_reached_check = (message_rate_limit .check_limit_reached ()
682702 or (dp_rate_limit is not None and dp_rate_limit .check_limit_reached (amount = amount ))
@@ -695,11 +715,15 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
695715 return TBPublishInfo (paho .MQTTMessageInfo (None ))
696716 if not log_posted and limit_reached_check :
697717 if isinstance (limit_reached_check , int ):
698- log .debug ("Rate limit reached for %i seconds, waiting for rate limit to be released..." , limit_reached_check )
718+ log .debug ("Rate limit reached for %i seconds, waiting for rate limit to be released..." ,
719+ limit_reached_check )
720+ waited = True
699721 else :
700722 log .debug ("Waiting for rate limit to be released..." )
701723 log_posted = True
702724 sleep (.01 )
725+ if waited :
726+ log .debug ("Rate limit released, sending data to ThingsBoard..." )
703727
704728 def wait_until_current_queued_messages_processed (self ):
705729 previous_notification_time = 0
@@ -718,12 +742,12 @@ def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
718742 msg_rate_limit = None , dp_rate_limit = None ):
719743 if msg_rate_limit is None :
720744 if kwargs .get ('topic' ) == TELEMETRY_TOPIC :
721- msg_rate_limit = self .__telemetry_rate_limit
745+ msg_rate_limit = self ._telemetry_rate_limit
722746 else :
723747 msg_rate_limit = self ._messages_rate_limit
724748 if dp_rate_limit is None :
725749 if kwargs .get ('topic' ) == TELEMETRY_TOPIC :
726- dp_rate_limit = self .__telemetry_dp_rate_limit
750+ dp_rate_limit = self ._telemetry_dp_rate_limit
727751 else :
728752 dp_rate_limit = self .EMPTY_RATE_LIMIT
729753 if msg_rate_limit .has_limit () or dp_rate_limit .has_limit ():
@@ -758,7 +782,7 @@ def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp
758782 return msg_rate_limit , dp_rate_limit
759783 else :
760784 if topic == TELEMETRY_TOPIC :
761- return self .__telemetry_rate_limit , self .__telemetry_dp_rate_limit
785+ return self ._telemetry_rate_limit , self ._telemetry_dp_rate_limit
762786 else :
763787 return self ._messages_rate_limit , None
764788
@@ -771,22 +795,20 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
771795 attributes_format = topic .endswith ('attributes' )
772796 if topic .endswith ('telemetry' ) or attributes_format :
773797 if device is None or data .get (device ) is None :
774- device_split_messages = self ._split_message (data , dp_rate_limit .get_minimal_limit (),
775- self .max_payload_size )
798+ device_split_messages = self ._split_message (data , dp_rate_limit .get_minimal_limit (), self .max_payload_size )
776799 if attributes_format :
777800 split_messages = [{'message' : msg_data , 'datapoints' : len (msg_data )} for split_message in device_split_messages for msg_data in split_message ['data' ]]
778801 else :
779802 split_messages = [{'message' : split_message ['data' ], 'datapoints' : split_message ['datapoints' ]} for split_message in device_split_messages ]
780803 else :
781804 device_data = data .get (device )
782- device_split_messages = self ._split_message (device_data , dp_rate_limit .get_minimal_limit (),
783- self .max_payload_size )
805+ device_split_messages = self ._split_message (device_data , dp_rate_limit .get_minimal_limit (), self .max_payload_size )
784806 if attributes_format :
785807 split_messages = [{'message' : {device : msg_data }, 'datapoints' : len (msg_data )} for split_message in device_split_messages for msg_data in split_message ['data' ]]
786808 else :
787809 split_messages = [{'message' : {device : split_message ['data' ]}, 'datapoints' : split_message ['datapoints' ]} for split_message in device_split_messages ]
788810 else :
789- split_messages = [{'message' : data , 'datapoints' : 0 }]
811+ split_messages = [{'message' : data , 'datapoints' : self . _count_datapoints_in_message ( data , device ) }]
790812
791813 results = []
792814 for part in split_messages :
@@ -801,10 +823,18 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
801823 kwargs ["payload" ] = dumps (part ['message' ])
802824 self .wait_until_current_queued_messages_processed ()
803825 if not self .stopped :
826+ if device is not None :
827+ log .debug ("Device: %s, Sending message to topic: %s " , device , topic )
804828 if part ['datapoints' ] > 0 :
805829 log .debug ("Sending message with %i datapoints" , part ['datapoints' ])
830+ log .debug ("Message payload: %r" , kwargs ["payload" ])
831+ log .debug ("Rate limits after sending message: %r" , msg_rate_limit .__dict__ )
832+ log .debug ("Data points rate limits after sending message: %r" , dp_rate_limit .__dict__ )
806833 else :
807834 log .debug ("Sending message with %r" , kwargs ["payload" ])
835+ log .debug ("Message payload: %r" , kwargs ["payload" ])
836+ log .debug ("Rate limits after sending message: %r" , msg_rate_limit .__dict__ )
837+ log .debug ("Data points rate limits after sending message: %r" , dp_rate_limit .__dict__ )
808838 results .append (self ._client .publish (** kwargs ))
809839 return TBPublishInfo (results )
810840
@@ -956,14 +986,18 @@ def _count_datapoints_in_message(data, device=None):
956986 if isinstance (data .get (device ), list ):
957987 for data_object in data [device ]:
958988 datapoints += TBDeviceMqttClient ._count_datapoints_in_message (data_object ) # noqa
959- else :
989+ elif isinstance ( data . get ( device ), dict ) :
960990 datapoints += TBDeviceMqttClient ._count_datapoints_in_message (data .get (device , data .get ('device' )))
991+ else :
992+ datapoints += 1
961993 else :
962994 if isinstance (data , dict ):
963995 datapoints += TBDeviceMqttClient ._get_data_points_from_message (data )
964- else :
996+ elif isinstance ( data , list ) :
965997 for item in data :
966998 datapoints += TBDeviceMqttClient ._get_data_points_from_message (item )
999+ else :
1000+ datapoints += 1
9671001 return datapoints
9681002
9691003 @staticmethod
0 commit comments