@@ -1163,15 +1163,18 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11631163 final_message_item = {'data' : [], 'datapoints' : 0 }
11641164
11651165 message_item_values_with_allowed_size = {}
1166+ ts = None
11661167 current_size = 0
11671168
11681169 for (message_index , message ) in enumerate (message_pack ):
11691170 if not isinstance (message , dict ):
11701171 log .error ("Message is not a dictionary!" )
11711172 log .debug ("Message: %s" , message )
11721173 continue
1173-
1174+ old_ts = ts if ts is not None else message . get ( "ts" )
11741175 ts = message .get ("ts" )
1176+ ts_changed = ts is not None and old_ts != ts
1177+
11751178 values = message .get ("values" , message )
11761179 values_data_keys = list (values )
11771180
@@ -1189,14 +1192,19 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11891192 data_key_size = len (data_key ) + len (str (value ))
11901193
11911194 if ((datapoints_max_count == 0 or len (message_item_values_with_allowed_size ) < datapoints_max_count )
1192- and current_size + data_key_size < max_payload_size ):
1195+ and current_size + data_key_size < max_payload_size ) and not ts_changed :
11931196 message_item_values_with_allowed_size [data_key ] = value
11941197 current_size += data_key_size
11951198
1196- if ((datapoints_max_count > 0 and len (message_item_values_with_allowed_size ) >= datapoints_max_count + current_size // 1024 )
1197- or current_size + data_key_size >= max_payload_size ):
1199+ if ((TBDeviceMqttClient ._datapoints_limit_reached (datapoints_max_count , len (message_item_values_with_allowed_size ), current_size ))
1200+ or TBDeviceMqttClient ._payload_size_limit_reached (max_payload_size , current_size , data_key_size )) \
1201+ or ts_changed :
11981202 if ts :
1199- message_chunk = {"ts" : ts , "values" : message_item_values_with_allowed_size .copy ()}
1203+ ts_to_write = ts
1204+ if old_ts is not None and old_ts != ts :
1205+ ts_to_write = old_ts
1206+ old_ts = ts
1207+ message_chunk = {"ts" : ts_to_write , "values" : message_item_values_with_allowed_size .copy ()}
12001208 if 'metadata' in message :
12011209 message_chunk ['metadata' ] = message ['metadata' ]
12021210 final_message_item ['data' ].append (message_chunk )
@@ -1208,6 +1216,10 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
12081216 final_message_item = {'data' : [], 'datapoints' : 0 }
12091217
12101218 message_item_values_with_allowed_size .clear ()
1219+ if ts_changed :
1220+ message_item_values_with_allowed_size [data_key ] = value
1221+ current_size += data_key_size
1222+ ts_changed = False
12111223 current_size = 0
12121224
12131225 if (message_index == len (message_pack ) - 1
@@ -1221,10 +1233,19 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
12211233 final_message_item ['data' ].append (message_item_values_with_allowed_size .copy ())
12221234
12231235 final_message_item ['datapoints' ] = len (message_item_values_with_allowed_size )
1224- append_split_message (final_message_item .copy ())
1236+ if final_message_item ['data' ]:
1237+ append_split_message (final_message_item .copy ())
12251238
12261239 return split_messages
12271240
1241+ @staticmethod
1242+ def _datapoints_limit_reached (datapoints_max_count , current_datapoints_size , current_size ):
1243+ return 0 < datapoints_max_count <= current_datapoints_size + current_size // 1024
1244+
1245+ @staticmethod
1246+ def _payload_size_limit_reached (max_payload_size , current_size , additional_size ):
1247+ return current_size + additional_size >= max_payload_size
1248+
12281249 def add_attrs_request_timeout (self , attr_request_number , timeout ):
12291250 self .__attrs_request_timeout [attr_request_number ] = timeout
12301251
0 commit comments