This repository was archived by the owner on Jan 25, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
80 lines (64 loc) · 2.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from src.config import KAFKA_ENDPOINT, KAFKA_INPUT_TOPIC, \
KAFKA_LOGGING_TOPIC, POSTS_ENDPOINT, SERVICE_KEY
from src.model.topic_extractor import TopicExtractor
from src.kafka.kafka_consumer import Consumer
from src.kafka.kafka_producer import Producer
from src.util.logging_handler import KafkaLoggingHandler
from src.util.decorators import background
import logging
logger = logging.getLogger(SERVICE_KEY)
logger.setLevel(logging.DEBUG)
def main():
""" Starts a kafka listener and processes messages received from it.
Entrypoint of the application, that will receive the texts to be processed
from a kafka queue. Each text will be passed to the topic modeling pipeline,
and when the topics are infered they will be sent to the Posts service through
a SOAP request.
"""
_setup_logging()
consumer = Consumer(KAFKA_ENDPOINT, KAFKA_INPUT_TOPIC)
producer = Producer(KAFKA_ENDPOINT)
# since there is no timeout on the consumer, this is an infinite loop on messages received from kafka
for message in consumer:
logger.info(f"Message received from Kafka: {message.value}")
_on_message_received(message, producer)
logger.warning("Connection to Kafka was lost. Stopping program")
@background
def _on_message_received(message, producer):
""" Logic to process a kafka message
This function runs in the background (see decorator implementation at utils package).
The topics from the post are extracted, and then they are sent to the Posts service
through a SOAP request.
"""
post_id = message.key
post_content = message.value.decode('utf-8')
# we could obtain information about the language of a post.
# for now, we expect it to come in english
extractor = TopicExtractor()
topics = extractor.extract_topics_from(post_content)
_send_topics_to_queue(post_id, topics, producer)
def _send_topics_to_queue(post_id, topics, producer):
logger.info(f"Send topics to queue: {topics}")
producer.produce("topics", post_id, _encode_topics(topics))
def _encode_topics(topics):
""" Utility method used to encode the array of topics
"""
res = bytearray()
# 1st byte -> number of elements
res += bytes([len(topics)])
for topic, score in topics:
encoded_topic = bytearray(topic.encode('utf-8'))
# following byte -> number of bytes of the next string
res += bytes([len(encoded_topic)])
# following bytes -> string encoded to bytes
res += encoded_topic
return res
def _setup_logging():
if len(logger.handlers) == 0:
logger.addHandler(logging.StreamHandler())
logger.debug(f"Starting kafka logging with endpoint '{KAFKA_ENDPOINT}' and topic '{KAFKA_LOGGING_TOPIC}'")
kh = KafkaLoggingHandler(KAFKA_ENDPOINT, key="topic_modeling", topic=KAFKA_LOGGING_TOPIC)
logger.addHandler(kh)
logger.debug("Logging system started")
if __name__ == '__main__':
exit(main())