Skip to content

Commit

Permalink
Azure Service Bus (#208)
Browse files Browse the repository at this point in the history
* Azure Service Bus

This update removes the code that attaches source and partition information to the Kafka SourceRecord created.

 Currently, these details are stored even though they are not used when resuming a connector instance—primarily because, given that the message originates from either a topic or a queue, there isn’t a specific resume point.

Another challenge addressed as a result of the change is the potential for high memory usage. The existing implementation of Azure Service Bus’s .getPartitionKey method builds the partition key by concatenating all available annotations from the message. This approach can generate very large strings when messages contain many annotations, leading to increased memory consumption.

* Removes the obsolete testing class

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Feb 3, 2025
1 parent 7e6e823 commit 59856da
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector;
import java.time.Instant;
import java.util.Map;
import java.util.Collections;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -68,21 +68,17 @@ private ServiceBusToSourceRecordMapper() {
*
* @param serviceBusMessage original Service Bus message
* @param outputTopic Output topic for record
* @param partitionKey AzureTopicPartitionKey to indicate topic and partition
* @param offsetMap AzureOffsetMarker to indicate offset
* @return mapped SourceRecord
*/
public static SourceRecord mapSingleServiceBusMessage(ServiceBusReceivedMessage serviceBusMessage,
String outputTopic,
Map<String, String> partitionKey,
Map<String, Object> offsetMap) {
String outputTopic) {
long before = System.currentTimeMillis();
String key = serviceBusMessage.getMessageId();

Struct valueObject = createStructFromServiceBusMessage(serviceBusMessage);
long after = System.currentTimeMillis();
log.debug("Mapping of message with id {} took {} ms", key, after - before);
return new AzureServiceBusSourceRecord(partitionKey, offsetMap, outputTopic, key,
return new AzureServiceBusSourceRecord(Collections.emptyMap(), Collections.emptyMap(), outputTopic, key,
VALUE_SCHEMA, valueObject, Instant.now().toEpochMilli());
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.connect.azure.servicebus.source.ServiceBusPartitionOffsetProvider.AzureServiceBusOffsetMarker;
import io.lenses.streamreactor.connect.azure.servicebus.source.ServiceBusPartitionOffsetProvider.AzureServiceBusPartitionKey;
import io.lenses.streamreactor.connect.azure.servicebus.util.ServiceBusKcqlProperties;
import io.lenses.streamreactor.connect.azure.servicebus.util.ServiceBusType;
import lombok.Getter;
Expand Down Expand Up @@ -144,13 +142,7 @@ public static Consumer<ServiceBusReceivedMessage> onSuccessfulMessage(
String inputBus,
String outputTopic) {
return message -> {
long sequenceNumber = message.getSequenceNumber();
AzureServiceBusOffsetMarker offsetMarker =
new AzureServiceBusOffsetMarker(sequenceNumber);
AzureServiceBusPartitionKey partitionKey =
new AzureServiceBusPartitionKey(inputBus, message.getPartitionKey());

SourceRecord sourceRecord = mapSingleServiceBusMessage(message, outputTopic, partitionKey, offsetMarker);
SourceRecord sourceRecord = mapSingleServiceBusMessage(message, outputTopic);
ServiceBusMessageHolder serviceBusMessageHolder =
new ServiceBusMessageHolder(message, sourceRecord, receiverId);
boolean offer = false;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;

import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageBody;
Expand All @@ -34,8 +35,6 @@
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;

import io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector;
import io.lenses.streamreactor.connect.azure.servicebus.source.ServiceBusPartitionOffsetProvider.AzureServiceBusOffsetMarker;
import io.lenses.streamreactor.connect.azure.servicebus.source.ServiceBusPartitionOffsetProvider.AzureServiceBusPartitionKey;

class ServiceBusToSourceRecordMapperTest {

Expand All @@ -61,19 +60,16 @@ class ServiceBusToSourceRecordMapperTest {
void mapSingleSourceRecordWitAllParameters() {
//given
ServiceBusReceivedMessage busMessage = prepareMessageBusWithAllConsumedFields();
AzureServiceBusPartitionKey partitionKey = new AzureServiceBusPartitionKey(OUTPUT_TOPIC, PARTITION_KEY);
AzureServiceBusOffsetMarker busOffsetMarker = new AzureServiceBusOffsetMarker(SEQUENCE_NUMBER);

//when
SourceRecord sourceRecord =
ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC, partitionKey,
busOffsetMarker);
ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC);

//then
assertThat(sourceRecord)
.returns(partitionKey, from(SourceRecord::sourcePartition))
.returns(Collections.emptyMap(), from(SourceRecord::sourcePartition))
.returns(null, from(SourceRecord::kafkaPartition))
.returns(busOffsetMarker, from(SourceRecord::sourceOffset))
.returns(Collections.emptyMap(), from(SourceRecord::sourceOffset))
.returns(OUTPUT_TOPIC, from(SourceRecord::topic))
.returns(Schema.STRING_SCHEMA, from(SourceRecord::keySchema))
.returns(ServiceBusToSourceRecordMapper.VALUE_SCHEMA, from(SourceRecord::valueSchema));
Expand All @@ -87,19 +83,16 @@ void mapSingleSourceRecordWitAllParameters() {
void mapSingleSourceRecordAllowsForOptionalSchemaFieldsToBeNull() {
//given
ServiceBusReceivedMessage busMessage = prepareMessageBusWithOnlyRequiredFields();
AzureServiceBusPartitionKey partitionKey = new AzureServiceBusPartitionKey(OUTPUT_TOPIC, PARTITION_KEY);
AzureServiceBusOffsetMarker busOffsetMarker = new AzureServiceBusOffsetMarker(SEQUENCE_NUMBER);

//when
SourceRecord sourceRecord =
ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC, partitionKey,
busOffsetMarker);
ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC);

//then
assertThat(sourceRecord)
.returns(partitionKey, from(SourceRecord::sourcePartition))
.returns(Collections.emptyMap(), from(SourceRecord::sourcePartition))
.returns(null, from(SourceRecord::kafkaPartition))
.returns(busOffsetMarker, from(SourceRecord::sourceOffset))
.returns(Collections.emptyMap(), from(SourceRecord::sourceOffset))
.returns(OUTPUT_TOPIC, from(SourceRecord::topic))
.returns(Schema.STRING_SCHEMA, from(SourceRecord::keySchema))
.returns(ServiceBusToSourceRecordMapper.VALUE_SCHEMA, from(SourceRecord::valueSchema));
Expand Down

0 comments on commit 59856da

Please sign in to comment.