Skip to content

Commit 827da4a

Browse files
committed
Retry initialization of Kafka producer
- multiple attempts - configurable attempts delay - randomized attempt delays
1 parent 03dcbae commit 827da4a

File tree

1 file changed

+36
-12
lines changed

1 file changed

+36
-12
lines changed

kafka_logger/handlers.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
import datetime
55
import json
66
import logging
7-
import multiprocessing
7+
from multiprocessing import Queue
88
import os
99
import socket
1010
import sys
1111
from threading import Lock, Thread, Timer
1212
import time
13+
import random
1314

1415
from kafka import KafkaProducer # pylint: disable=import-error
1516

@@ -46,20 +47,22 @@ class KafkaLoggingHandler(logging.Handler):
4647
__LOGGING_FILTER_FIELDS = ["msecs", "relativeCreated", "levelno", "created"]
4748
__MULTIPROCESSING_QUEUE_FLUSH_DELAY = 0.2
4849

49-
# pylint: disable=too-many-arguments
5050
def __init__(
5151
self,
5252
hosts_list,
5353
topic,
5454
security_protocol="SSL",
5555
ssl_cafile=None,
5656
kafka_producer_args=None,
57+
kafka_producer_init_retries=0,
58+
kafka_producer_init_delay_ms=3000,
59+
kafka_producer_init_delay_rand_ms=500,
5760
additional_fields=None,
5861
flush_buffer_size=None,
5962
flush_interval=5.0,
6063
unhandled_exception_logger=None,
6164
log_preprocess=None,
62-
):
65+
): # pylint: disable=too-many-arguments,too-many-locals
6366
"""
6467
Initialize the handler.
6568
@@ -70,6 +73,12 @@ def __init__(
7073
ssl_cafile (None, optional): path to CA file
7174
kafka_producer_args (None, optional):
7275
extra arguments to pass to KafkaProducer
76+
kafka_producer_init_retries (int, optional):
77+
number of additional attempts to initialize Kafka producer
78+
kafka_producer_init_delay_ms (int, optional):
79+
static delay for attempts to initialize producer
80+
kafka_producer_init_delay_rand_ms (int, optional):
81+
randomized delay for attempts to initialize producer
7382
additional_fields (None, optional):
7483
A dictionary with all the additional fields that you would like
7584
to add to the logs, such the application, environment, etc.
@@ -103,7 +112,9 @@ def __init__(
103112
)
104113
self.flush_interval = flush_interval
105114
self.timer = None
106-
self.additional_fields = additional_fields.copy()
115+
self.additional_fields = (
116+
additional_fields.copy() if additional_fields is not None else {}
117+
)
107118
self.additional_fields.update(
108119
{
109120
"host": socket.gethostname(),
@@ -115,13 +126,26 @@ def __init__(
115126
if kafka_producer_args is None:
116127
kafka_producer_args = {}
117128

118-
self.producer = KafkaProducer(
119-
bootstrap_servers=hosts_list,
120-
security_protocol=security_protocol,
121-
ssl_cafile=ssl_cafile,
122-
value_serializer=lambda msg: json.dumps(msg).encode("utf-8"),
123-
**kafka_producer_args
124-
)
129+
for init_attempt in range(kafka_producer_init_retries + 1, 0, -1):
130+
try:
131+
self.producer = KafkaProducer(
132+
bootstrap_servers=hosts_list,
133+
security_protocol=security_protocol,
134+
ssl_cafile=ssl_cafile,
135+
value_serializer=lambda msg: json.dumps(msg).encode("utf-8"),
136+
**kafka_producer_args
137+
)
138+
except Exception: # pylint: disable=broad-except
139+
if init_attempt == 1: # last attempt failed
140+
raise
141+
logging.exception("Exception during Kafka producer init")
142+
attempt_delay = kafka_producer_init_delay_ms + random.randint(
143+
0, kafka_producer_init_delay_rand_ms
144+
)
145+
logging.debug("Sleeping %d ms", attempt_delay)
146+
time.sleep(attempt_delay / 1000)
147+
else:
148+
break
125149

126150
# setup exit hooks
127151
# exit hooks work only in main process
@@ -146,7 +170,7 @@ def __init__(
146170

147171
self.enabled = True
148172

149-
except Exception:
173+
except Exception: # pylint: disable=broad-except
150174
logging.exception("Startup error of the Kafka logging handler")
151175

152176
# teardown failed startup

0 commit comments

Comments
 (0)