Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ async def receive_msg(
except UnicodeError:
logger.exception("Unicode error while decoding headers")

# Process key
try:
key = msg.key.decode(encoding)
data = json.loads(key)
except json.decoder.JSONDecodeError:
logger.info("JSON decode error, storing raw key")
data = key
except UnicodeError:
logger.exception("Unicode error while decoding key")
data = None

# Add key to the event and put it into the queue
if data:
event["meta"]["key"] = data

# Process message body
try:
value = msg.value.decode(encoding)
Expand All @@ -260,7 +275,7 @@ async def receive_msg(
logger.exception("Unicode error while decoding message body")
data = None

# Add data to the event and put it into the queue
# Add body to the event and put it into the queue
if data:
event["body"] = data
await queue.put(event)
Expand Down
Loading