Skip to content

Conversation

@akshayutture-augment
Copy link

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
Copilot AI review requested due to automatic review settings November 14, 2025 22:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 5 out of 7 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get() call will block indefinitely when the queue is empty. This prevents the worker thread from shutting down gracefully when self.shutdown is set to True. Use get(timeout=1) with appropriate exception handling to allow periodic shutdown checks.

Suggested change
work_item = self.work_queue.get()
work_item = self.work_queue.get(timeout=1)
except queue.Empty:
continue

Copilot uses AI. Check for mistakes.
) -> None:
self.mode = mode
metric_tags = {"identifier": self.identifier, "mode": self.mode}
self.result_processor = self.result_processor_cls()
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result_processor is now instantiated before checking the mode, but previously it was created after all mode-specific setup. This changes the initialization order which could affect mode-specific configurations that might depend on the processor's state. Consider whether this reordering is intentional and document why it's necessary.

Copilot uses AI. Check for mistakes.
Comment on lines +2081 to +2106
factory.create_with_partitions = create_with_partitions_tracking # type: ignore[method-assign]
consumer_config = build_kafka_consumer_configuration(
settings.KAFKA_CLUSTERS["default"]["common"],
group_id=consumer_group,
auto_offset_reset="earliest",
)

consumer = KafkaConsumer(consumer_config)
processor = StreamProcessor(
consumer=consumer,
topic=Topic(test_topic),
processor_factory=factory,
commit_policy=ONCE_PER_SECOND,
)

with mock.patch.object(
type(factory.result_processor), "__call__"
) as mock_processor:
mock_processor.return_value = None

start_time = time.time()
while time.time() - start_time < 5:
processor._run_once()
time.sleep(0.1)

processor._shutdown()
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly assigning to a method bypasses Python's descriptor protocol and can lead to unexpected behavior. Consider using unittest.mock.patch.object() or wrapping the factory in a test subclass that overrides the method properly.

Suggested change
factory.create_with_partitions = create_with_partitions_tracking # type: ignore[method-assign]
consumer_config = build_kafka_consumer_configuration(
settings.KAFKA_CLUSTERS["default"]["common"],
group_id=consumer_group,
auto_offset_reset="earliest",
)
consumer = KafkaConsumer(consumer_config)
processor = StreamProcessor(
consumer=consumer,
topic=Topic(test_topic),
processor_factory=factory,
commit_policy=ONCE_PER_SECOND,
)
with mock.patch.object(
type(factory.result_processor), "__call__"
) as mock_processor:
mock_processor.return_value = None
start_time = time.time()
while time.time() - start_time < 5:
processor._run_once()
time.sleep(0.1)
processor._shutdown()
with mock.patch.object(factory, "create_with_partitions", create_with_partitions_tracking):
consumer_config = build_kafka_consumer_configuration(
settings.KAFKA_CLUSTERS["default"]["common"],
group_id=consumer_group,
auto_offset_reset="earliest",
)
consumer = KafkaConsumer(consumer_config)
processor = StreamProcessor(
consumer=consumer,
topic=Topic(test_topic),
processor_factory=factory,
commit_policy=ONCE_PER_SECOND,
)
with mock.patch.object(
type(factory.result_processor), "__call__"
) as mock_processor:
mock_processor.return_value = None
start_time = time.time()
while time.time() - start_time < 5:
processor._run_once()
time.sleep(0.1)
processor._shutdown()

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants