44import datetime
55import json
66import logging
7- import multiprocessing
7+ from multiprocessing import Queue
88import os
99import socket
1010import sys
1111from threading import Lock , Thread , Timer
1212import time
13+ import random
1314
1415from kafka import KafkaProducer # pylint: disable=import-error
1516
@@ -46,20 +47,22 @@ class KafkaLoggingHandler(logging.Handler):
4647 __LOGGING_FILTER_FIELDS = ["msecs" , "relativeCreated" , "levelno" , "created" ]
4748 __MULTIPROCESSING_QUEUE_FLUSH_DELAY = 0.2
4849
49- # pylint: disable=too-many-arguments
5050 def __init__ (
5151 self ,
5252 hosts_list ,
5353 topic ,
5454 security_protocol = "SSL" ,
5555 ssl_cafile = None ,
5656 kafka_producer_args = None ,
57+ kafka_producer_init_retries = 0 ,
58+ kafka_producer_init_delay_ms = 3000 ,
59+ kafka_producer_init_delay_rand_ms = 500 ,
5760 additional_fields = None ,
5861 flush_buffer_size = None ,
5962 flush_interval = 5.0 ,
6063 unhandled_exception_logger = None ,
6164 log_preprocess = None ,
62- ):
65+ ): # pylint: disable=too-many-arguments,too-many-locals
6366 """
6467 Initialize the handler.
6568
@@ -70,6 +73,12 @@ def __init__(
7073 ssl_cafile (None, optional): path to CA file
7174 kafka_producer_args (None, optional):
7275 extra arguments to pass to KafkaProducer
76+ kafka_producer_init_retries (int, optional):
77+ number of additional attempts to initialize Kafka producer
78+ kafka_producer_init_delay_ms (int, optional):
79+ static delay for attempts to initialize producer
80+ kafka_producer_init_delay_rand_ms (int, optional):
81+ randomized delay for attempts to initialize producer
7382 additional_fields (None, optional):
7483 A dictionary with all the additional fields that you would like
7584 to add to the logs, such the application, environment, etc.
@@ -87,62 +96,85 @@ def __init__(
8796
8897 """
8998 logging .Handler .__init__ (self )
99+ self .enabled = False
90100
91- if security_protocol == "SSL" and ssl_cafile is None :
92- raise KafkaLoggerException ("SSL CA file isn't provided." )
101+ try :
102+ if security_protocol == "SSL" and ssl_cafile is None :
103+ raise KafkaLoggerException ("SSL CA file isn't provided." )
93104
94- self .kafka_topic_name = topic
95- self .unhandled_exception_logger = unhandled_exception_logger
105+ self .kafka_topic_name = topic
106+ self .unhandled_exception_logger = unhandled_exception_logger
96107
97- self .buffer = []
98- self .buffer_lock = Lock ()
99- self .max_buffer_size = (
100- flush_buffer_size if flush_buffer_size is not None else float ("inf" )
101- )
102- self .flush_interval = flush_interval
103- self .timer = None
104- self .additional_fields = {}
105- if additional_fields :
106- self .additional_fields = additional_fields .copy ()
107- self .additional_fields .update (
108- {
109- "host" : socket .gethostname (),
110- "host_ip" : socket .gethostbyname (socket .gethostname ()),
111- }
112- )
113- self .log_preprocess = log_preprocess if log_preprocess is not None else []
114-
115- if kafka_producer_args is None :
116- kafka_producer_args = {}
117-
118- self .producer = KafkaProducer (
119- bootstrap_servers = hosts_list ,
120- security_protocol = security_protocol ,
121- ssl_cafile = ssl_cafile ,
122- value_serializer = lambda msg : json .dumps (msg ).encode ("utf-8" ),
123- ** kafka_producer_args
124- )
125-
126- # setup exit hooks
127- # exit hooks work only in main process
128- # termination of child processes uses os.exit() and ignore any hooks
129- atexit .register (self .at_exit )
130-
131- # Dont touch sys.excepthook if no logger provided
132- if self .unhandled_exception_logger is not None :
133- sys .excepthook = self .unhandled_exception
134-
135- # multiprocessing support
136- self .main_process_pid = os .getpid ()
137- self .mp_log_queue = multiprocessing .Queue ()
138- # main process thread that will flush mp queue
139- self .mp_log_handler_flush_lock = Lock ()
140- self .mp_log_handler_thread = Thread (
141- target = self .mp_log_handler , name = "Kafka Logger Multiprocessing Handler"
142- )
143- # daemon will terminate with the main process
144- self .mp_log_handler_thread .setDaemon (True )
145- self .mp_log_handler_thread .start ()
108+ self .buffer = []
109+ self .buffer_lock = Lock ()
110+ self .max_buffer_size = (
111+ flush_buffer_size if flush_buffer_size is not None else float ("inf" )
112+ )
113+ self .flush_interval = flush_interval
114+ self .timer = None
115+ self .additional_fields = (
116+ additional_fields .copy () if additional_fields is not None else {}
117+ )
118+ self .additional_fields .update (
119+ {
120+ "host" : socket .gethostname (),
121+ "host_ip" : socket .gethostbyname (socket .gethostname ()),
122+ }
123+ )
124+ self .log_preprocess = log_preprocess if log_preprocess is not None else []
125+
126+ if kafka_producer_args is None :
127+ kafka_producer_args = {}
128+
129+ for init_attempt in range (kafka_producer_init_retries + 1 , 0 , - 1 ):
130+ try :
131+ self .producer = KafkaProducer (
132+ bootstrap_servers = hosts_list ,
133+ security_protocol = security_protocol ,
134+ ssl_cafile = ssl_cafile ,
135+ value_serializer = lambda msg : json .dumps (msg ).encode ("utf-8" ),
136+ ** kafka_producer_args
137+ )
138+ except Exception : # pylint: disable=broad-except
139+ if init_attempt == 1 : # last attempt failed
140+ raise
141+ logging .exception ("Exception during Kafka producer init" )
142+ attempt_delay = kafka_producer_init_delay_ms + random .randint (
143+ 0 , kafka_producer_init_delay_rand_ms
144+ )
145+ logging .debug ("Sleeping %d ms" , attempt_delay )
146+ time .sleep (attempt_delay / 1000 )
147+ else :
148+ break
149+
150+ # setup exit hooks
151+ # exit hooks work only in main process
152+ # termination of child processes uses os.exit() and ignore any hooks
153+ atexit .register (self .at_exit )
154+
155+ # Dont touch sys.excepthook if no logger provided
156+ if self .unhandled_exception_logger is not None :
157+ sys .excepthook = self .unhandled_exception
158+
159+ # multiprocessing support
160+ self .main_process_pid = os .getpid ()
161+ self .mp_log_queue = Queue ()
162+ # main process thread that will flush mp queue
163+ self .mp_log_handler_flush_lock = Lock ()
164+ self .mp_log_handler_thread = Thread (
165+ target = self .mp_log_handler , name = "Kafka Logger Multiprocessing Handler"
166+ )
167+ # daemon will terminate with the main process
168+ self .mp_log_handler_thread .setDaemon (True )
169+ self .mp_log_handler_thread .start ()
170+
171+ self .enabled = True
172+
173+ except Exception : # pylint: disable=broad-except
174+ logging .exception ("Startup error of the Kafka logging handler" )
175+
176+ # teardown failed startup
177+ atexit .unregister (self .at_exit )
146178
147179 def prepare_record_dict (self , record ):
148180 """
@@ -204,6 +236,9 @@ def emit(self, record):
204236 if record .name == "kafka.client" :
205237 return
206238
239+ if not self .enabled :
240+ return
241+
207242 record_dict = self .prepare_record_dict (record )
208243
209244 if os .getpid () == self .main_process_pid :
@@ -236,10 +271,15 @@ def flush(self):
236271 Skip if the buffer is empty.
237272 Uses multithreading lock to access buffer.
238273 """
274+ # logging.shutdown() can trigger flush() directly
275+ # main_process_pid is unknown if startup failed
276+ if not self .enabled :
277+ return
278+
239279 # if flush is triggered in a child process => skip
240- # logging.shutdown() can trigger flush()
241280 if os .getpid () != self .main_process_pid :
242281 return
282+
243283 # clean up the timer (reached max buffer size)
244284 if self .timer is not None and self .timer .is_alive ():
245285 self .timer .cancel ()
@@ -299,7 +339,7 @@ def at_exit(self):
299339 "main process termination. This may cause logs loss." ,
300340 len (children ),
301341 )
302- while self .mp_log_queue .qsize () != 0 :
342+ while not self .mp_log_queue .empty () :
303343 time .sleep (KafkaLoggingHandler .__MULTIPROCESSING_QUEUE_FLUSH_DELAY )
304344 # wait until everything in multiprocessing queue will be buffered
305345 self .mp_log_handler_flush_lock .acquire ()
0 commit comments