@@ -167,16 +167,27 @@ async def connect_workflow(self, w_id, contact_data):
167
167
self ._update_contact (w_id , contact_data )
168
168
169
169
@log_call
170
- def disconnect_workflow (self , w_id ):
170
+ def disconnect_workflow (self , w_id , update_contact = True ):
171
171
"""Terminate workflow subscriptions.
172
172
173
173
Call this when a workflow has stopped.
174
174
"""
175
- self ._update_contact (
176
- w_id ,
177
- status = WorkflowStatus .STOPPED .value ,
178
- status_msg = self ._get_status_msg (w_id , False ),
179
- )
175
+ disconnect_msg = self ._get_status_msg (w_id , False )
176
+ if (
177
+ update_contact
178
+ and w_id in self .data
179
+ and (
180
+ self .data [w_id ][WORKFLOW ].status != (
181
+ WorkflowStatus .STOPPED .value
182
+ )
183
+ or self .data [w_id ][WORKFLOW ].status_msg != disconnect_msg
184
+ )
185
+ ):
186
+ self ._update_contact (
187
+ w_id ,
188
+ status = WorkflowStatus .STOPPED .value ,
189
+ status_msg = disconnect_msg ,
190
+ )
180
191
if w_id in self .w_subs :
181
192
self .w_subs [w_id ].stop ()
182
193
del self .w_subs [w_id ]
@@ -202,7 +213,9 @@ def get_workflows(self):
202
213
@log_call
203
214
def _purge_workflow (self , w_id ):
204
215
"""Purge the manager of a workflow's subscription and data."""
205
- self .disconnect_workflow (w_id )
216
+ # Ensure no old/new subscriptions exist on purge,
217
+ # this shouldn't happen if disconnect is run before unregister.
218
+ self .disconnect_workflow (w_id , update_contact = False )
206
219
if w_id in self .data :
207
220
del self .data [w_id ]
208
221
if w_id in self .delta_queues :
@@ -251,12 +264,6 @@ def _update_workflow_data(self, topic, delta, w_id):
251
264
continue
252
265
if topic == 'shutdown' :
253
266
self ._delta_store_to_queues (w_id , topic , delta )
254
- # update the status to stopped and set the status message
255
- self ._update_contact (
256
- w_id ,
257
- status = WorkflowStatus .STOPPED .value ,
258
- status_msg = self ._get_status_msg (w_id , False ),
259
- )
260
267
# close connections
261
268
self .disconnect_workflow (w_id )
262
269
return
0 commit comments