@@ -233,15 +233,20 @@ def publish(self, name, payload):
233233 with pools .producers [connection ].acquire () as producer :
234234 log .debug ("%sHave producer for publishing to key %s" , publish_log_prefix , key )
235235 publish_kwds = self .__prepare_publish_kwds (publish_log_prefix )
236- producer .publish (
237- payload ,
238- serializer = 'json' ,
239- exchange = self .__exchange ,
240- declare = [self .__exchange ],
241- routing_key = key ,
242- ** publish_kwds
243- )
244- log .debug ("%sPublished to key %s" , publish_log_prefix , key )
236+ try :
237+ producer .publish (
238+ payload ,
239+ serializer = 'json' ,
240+ exchange = self .__exchange ,
241+ declare = [self .__exchange ],
242+ routing_key = key ,
243+ ** publish_kwds
244+ )
245+ log .debug ("%sPublished to key %s" , publish_log_prefix , key )
246+ return True
247+ except Exception as e :
248+ log .error ("%sFailed to publish to key %s: %s" , publish_log_prefix , key , str (e ))
249+ self .__fail_publish (name , payload , e )
245250
246251 def ack_manager (self ):
247252 log .debug ('Acknowledgement manager thread alive' )
@@ -261,15 +266,37 @@ def ack_manager(self):
261266 'republishing original message on queue %s' ,
262267 unack_uuid , resubmit_queue )
263268 try :
264- self .publish (resubmit_queue , payload )
265- self .publish_uuid_store .set_time (unack_uuid )
269+ if self .publish (resubmit_queue , payload ):
270+ self .publish_uuid_store .set_time (unack_uuid )
271+ else :
272+ # If we fail to publish, we need to remove the uuid from the store
273+ # so it doesn't get republished again.
274+ self .__discard_publish_uuid (unack_uuid , failed )
266275 except self .recoverable_exceptions as e :
267276 self .__handle_io_error (e )
268277 continue
269278 except Exception :
270- log .exception ("Problem with acknowledgement manager, leaving ack_manager method in problematic state!" )
279+ log .exception ("Problem with acknowledgement manager, leaving ack manager in problematic state!" )
271280 raise
272- log .debug ('Acknowledgement manager thread exiting' )
281+
282+ def __fail_publish (self , name , payload , exception ):
283+ # Send just a few safe keys if we have them:
284+ keys_to_send = [
285+ "job_id" ,
286+ "returncode" ,
287+ "stdout" ,
288+ "stderr" ,
289+ "job_stdout" ,
290+ "job_stderr" ,
291+ ]
292+ new_payload = {}
293+ for key in keys_to_send :
294+ if key in payload :
295+ new_payload [key ] = payload [key ]
296+ # Add the original payload to the new payload
297+ new_payload ["exception" ] = str (exception )
298+ new_payload ["status" ] = "failed"
299+ self .publish (name , new_payload )
273300
274301 def __get_payload (self , uuid , failed ):
275302 """Retry reading a message from the publish_uuid_store once, delete on the second failure."""
0 commit comments