diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/generated/resources/camel-azure-eventhubs-source-source.json b/connectors/camel-azure-eventhubs-source-kafka-connector/src/generated/resources/camel-azure-eventhubs-source-source.json index a91402bdd6..afe74fe95a 100644 --- a/connectors/camel-azure-eventhubs-source-kafka-connector/src/generated/resources/camel-azure-eventhubs-source-source.json +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/generated/resources/camel-azure-eventhubs-source-source.json @@ -36,19 +36,19 @@ "camel.kamelet.azure-eventhubs-source.blobAccountName": { "name": "camel.kamelet.azure-eventhubs-source.blobAccountName", "description": "The name of the Storage Blob account.", - "priority": "HIGH", + "priority": "MEDIUM", "required": "true" }, "camel.kamelet.azure-eventhubs-source.blobContainerName": { "name": "camel.kamelet.azure-eventhubs-source.blobContainerName", "description": "The name of the Storage Blob container.", - "priority": "HIGH", + "priority": "MEDIUM", "required": "true" }, "camel.kamelet.azure-eventhubs-source.blobAccessKey": { "name": "camel.kamelet.azure-eventhubs-source.blobAccessKey", "description": "The key for the Azure Storage Blob service that is associated with the Storage Blob account name.", - "priority": "HIGH", + "priority": "MEDIUM", "required": "true" }, "camel.kamelet.azure-eventhubs-source.credentialType": { diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceConnectorConfig.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceConnectorConfig.java index 673b3a6c60..5860567a5e 100644 --- a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceConnectorConfig.java +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceConnectorConfig.java @@ -68,9 +68,9 @@ public static ConfigDef conf() { conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_DOC); conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_DOC); conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_DOC); - conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DOC); - conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DOC); - conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DOC); + conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DOC); + conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DOC); + conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DOC); conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_DOC); return conf; } diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceTask.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceTask.java index efa2f5c496..cd2a88ae47 100644 --- a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceTask.java +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/CamelAzureeventhubssourceSourceTask.java @@ -16,22 +16,86 @@ */ package org.apache.camel.kafkaconnector.azureeventhubssource; -import java.util.HashMap; +import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF; +import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF; +import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF; + import java.util.Map; -import javax.annotation.Generated; -import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig; + +import org.apache.camel.Exchange; +import org.apache.camel.component.azure.eventhubs.EventHubsEndpoint; import org.apache.camel.kafkaconnector.CamelSourceTask; +import org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubOffsetCheckpointStore; +import org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubSourcePartition; +import org.apache.kafka.common.config.ConfigException; + -@Generated("This class has been generated by camel-kafka-connector-generator-maven-plugin, remove this annotation to prevent it from being generated.") public class CamelAzureeventhubssourceSourceTask extends CamelSourceTask { + private EventHubOffsetCheckpointStore checkpointStore; + + public CamelAzureeventhubssourceSourceTask() { + setEndpointCustomizer(endpoint -> { + if (checkpointStore != null && endpoint instanceof EventHubsEndpoint eventHubsEndpoint) { + eventHubsEndpoint.getConfiguration().setCheckpointStore(checkpointStore); + } + }); + } @Override - protected CamelSourceConnectorConfig getCamelSourceConnectorConfig( - Map props) { + protected CamelAzureeventhubssourceSourceConnectorConfig getCamelSourceConnectorConfig(Map props) { return new CamelAzureeventhubssourceSourceConnectorConfig(props); } + + @Override + public void start(Map properties) { + final var config = getCamelSourceConnectorConfig(properties); + if (!isBlobCheckpointStorageEnabled(config)) { + checkpointStore = new EventHubOffsetCheckpointStore(context.offsetStorageReader()); + } + super.start(properties); + } + @Override protected String getSourceKamelet() { return "kamelet:azure-eventhubs-source"; } -} \ No newline at end of file + + @Override + protected Map toSourcePartition(Exchange exchange) { + if (checkpointStore != null) { + return EventHubSourcePartition.of(exchange).toMap(); + } + return super.toSourcePartition(exchange); + } + + @Override + protected Map toSourceOffset(Exchange exchange) { + if (checkpointStore != null) { + return checkpointStore.getSourceOffset(exchange); + } + return super.toSourceOffset(exchange); + } + + private static boolean isBlobCheckpointStorageEnabled(CamelAzureeventhubssourceSourceConnectorConfig config) { + final var blobAccountName = config.getString(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF); + final var blobContainerName = config.getPassword(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF); + final var blobAccessKey = config.getPassword(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF); + if (blobAccountName == null && blobContainerName == null && blobAccessKey == null) { + return false; + } + if (blobAccountName == null) { + throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF); + } + if (blobContainerName == null) { + throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF); + } + if (blobAccessKey == null) { + throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF); + } + return true; + } + + private static ConfigException incompleteBlobStorageConfigExeception(String missingConfigName) { + throw new ConfigException(missingConfigName, null, "Incomplete Azure Blob Storage configuration"); + } +} diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubCheckpointOffset.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubCheckpointOffset.java new file mode 100644 index 0000000000..2604ad4679 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubCheckpointOffset.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import static java.util.Collections.unmodifiableMap; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.azure.messaging.eventhubs.models.Checkpoint; + +/** + * The source offset information about the Azure EventHub checkpoint. + */ +record EventHubCheckpointOffset(Map value) { + private static final Logger LOG = LoggerFactory.getLogger(EventHubOffsetCheckpointStore.class); + + private static final String OFFSET = "offset"; + private static final String SEQUENCE_NUMBER = "sequenceNo"; + + static EventHubCheckpointOffset of(Checkpoint checkpoint) { + var value = new HashMap(); + Optional.ofNullable(checkpoint.getOffset()).ifPresent(it -> value.put(OFFSET, it)); + Optional.ofNullable(checkpoint.getSequenceNumber()).ifPresent(it -> value.put(SEQUENCE_NUMBER, it)); + return new EventHubCheckpointOffset(unmodifiableMap(value)); + } + + static Optional tryParse(final Map value) { + final var result = new HashMap(); + if (value.get(OFFSET) instanceof Number offset) { + result.put(OFFSET, offset.longValue()); + } + if (value.get(SEQUENCE_NUMBER) instanceof Number sequenceNumber) { + result.put(SEQUENCE_NUMBER, sequenceNumber.longValue()); + } + if (result.isEmpty()) { + LOG.warn("Invalid checkpoint-value {}", value); + return Optional.empty(); + } + return Optional.of(new EventHubCheckpointOffset(unmodifiableMap(result))); + } + + Long offset() { + if (value.get(OFFSET) instanceof Long offset) { + return offset; + } + return null; + } + + Long sequenceNumber() { + if (value.get(SEQUENCE_NUMBER) instanceof Long sequenceNumber) { + return sequenceNumber; + } + return null; + } +} \ No newline at end of file diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubConsumerInfo.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubConsumerInfo.java new file mode 100644 index 0000000000..0b403acc27 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubConsumerInfo.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import static java.util.Collections.unmodifiableMap; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; + +/** + * The source offset information about the Azure EventHub consumer. + */ +record EventHubConsumerInfo(Map value) { + private static final Logger LOG = LoggerFactory.getLogger(EventHubOffsetCheckpointStore.class); + + private static final String NAMESPACE = "namespace"; + private static final String EVENT_HUB = "eventHub"; + private static final String CONSUMER_GROUP = "consumerGroup"; + + static EventHubConsumerInfo of(String namespace, String eventHub, String consumerGroup) { + var value = new HashMap(); + Optional.ofNullable(namespace).ifPresent(it -> value.put(NAMESPACE, it)); + Optional.ofNullable(eventHub).ifPresent(it -> value.put(EVENT_HUB, it)); + Optional.ofNullable(consumerGroup).ifPresent(it -> value.put(CONSUMER_GROUP, it)); + return new EventHubConsumerInfo(unmodifiableMap(value)); + } + + static EventHubConsumerInfo of(Checkpoint checkpoint) { + return of(checkpoint.getFullyQualifiedNamespace(), + checkpoint.getEventHubName(), + checkpoint.getConsumerGroup()); + } + + static EventHubConsumerInfo of(PartitionOwnership partitionOwnership) { + return of(partitionOwnership.getFullyQualifiedNamespace(), + partitionOwnership.getEventHubName(), + partitionOwnership.getConsumerGroup()); + } + + static Optional tryParse(Map value) { + String fullyQualifiedNamespace = null; + String eventHubName = null; + String consumerGroup = null; + if (value.get(NAMESPACE) instanceof String it) { + fullyQualifiedNamespace = it; + } + if (value.get(NAMESPACE) instanceof String it) { + eventHubName = it; + } + if (value.get(CONSUMER_GROUP) instanceof String it) { + consumerGroup = it; + } + if (fullyQualifiedNamespace == null && eventHubName == null && consumerGroup == null) { + LOG.warn("Invalid offset-key {}", value); + return Optional.empty(); + } + return Optional.of(of(fullyQualifiedNamespace, eventHubName, consumerGroup)); + } + + String namespace() { + return value.get(NAMESPACE); + } + + String eventHub() { + return value.get(EVENT_HUB); + } + + String consumerGroup() { + return value.get(CONSUMER_GROUP); + } +} diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubOffsetCheckpointStore.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubOffsetCheckpointStore.java new file mode 100644 index 0000000000..a89e6c7e60 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubOffsetCheckpointStore.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.camel.Exchange; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + + +/** + * The implementation of the {@link CheckpointStore} using Kafka Connect offset topic. + *

+ * The checkpoints and partition ownership data is being stored in connector offset. + * The source partition is the name of the event hub, passed in the configuration. + *

+ */ +public final class EventHubOffsetCheckpointStore implements CheckpointStore { + private final OffsetStorageReader offsetStorageReader; + private final Map offsets; + + public EventHubOffsetCheckpointStore(final OffsetStorageReader offsetStorageReader) { + this.offsetStorageReader = offsetStorageReader; + offsets = new ConcurrentHashMap<>(); + } + + /** + * Returns the current source offset. + * + * @return the source offset value + */ + public Map getSourceOffset(Exchange exchange) { + final var partitionId = EventHubSourcePartition.of(exchange); + return getOffset(partitionId).toMap(); + } + + @Override + public Flux listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + final var eventHubConsumerInfo = EventHubConsumerInfo.of(fullyQualifiedNamespace, eventHubName, consumerGroup); + return Flux.fromStream(offsets.entrySet().stream() + .map(entry -> entry.getValue().getPartitionOwnership(entry.getKey(), eventHubConsumerInfo)) + .mapMulti(Optional::ifPresent)); + } + + @Override + public Flux claimOwnership(final List requestedPartitionOwnerships) { + return Flux.fromIterable(requestedPartitionOwnerships) + .map(partitionOwnership -> { + final var sourcePartition = new EventHubSourcePartition(partitionOwnership.getPartitionId()); + getOffset(sourcePartition).putPartitionOwnership(partitionOwnership); + return partitionOwnership; + }); + } + + @Override + public Flux listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + final var eventHubConsumerInfo = EventHubConsumerInfo.of(fullyQualifiedNamespace, eventHubName, consumerGroup); + return Flux.fromStream(offsets.entrySet().stream() + .map(entry -> entry.getValue().getCheckpoint(entry.getKey(), eventHubConsumerInfo)) + .mapMulti(Optional::ifPresent)); + } + + @Override + public Mono updateCheckpoint(final Checkpoint checkpoint) { + final var sourceParition = new EventHubSourcePartition(checkpoint.getPartitionId()); + return Mono.defer(() -> { + getOffset(sourceParition).putCheckpoint(checkpoint); + return Mono.empty(); + }); + } + + private EventHubSourceOffset getOffset(EventHubSourcePartition partitionId) { + return offsets.computeIfAbsent(partitionId, this::loadOffset); + } + + private EventHubSourceOffset loadOffset(final EventHubSourcePartition sourceParition) { + final var result = new EventHubSourceOffset(); + final var offset = offsetStorageReader.offset(sourceParition.toMap()); + if (offset != null) { + result.load(offset); + } + return result; + } +} diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubPartitionOwnershipOffset.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubPartitionOwnershipOffset.java new file mode 100644 index 0000000000..3fbfb4e621 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubPartitionOwnershipOffset.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import static java.util.Collections.unmodifiableMap; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.azure.messaging.eventhubs.models.PartitionOwnership; + +/** + * The source offset information about the Azure EventHub partition ownership. + */ +record EventHubPartitionOwnershipOffset(Map value) { + private static final Logger LOG = LoggerFactory.getLogger(EventHubPartitionOwnershipOffset.class); + + private static final String OWNER_ID = "ownerId"; + private static final String LAST_MODIFIED_TIME = "lastModified"; + private static final String ETAG = "etag"; + + static EventHubPartitionOwnershipOffset of(PartitionOwnership partitionOwnership) { + var key = new HashMap(); + Optional.ofNullable(partitionOwnership.getOwnerId()).ifPresent(it -> key.put(OWNER_ID, it)); + Optional.ofNullable(partitionOwnership.getLastModifiedTime()).ifPresent(it -> key.put(LAST_MODIFIED_TIME, it)); + Optional.ofNullable(partitionOwnership.getETag()).ifPresent(it -> key.put(ETAG, it)); + return new EventHubPartitionOwnershipOffset(unmodifiableMap(key)); + } + + static Optional tryParse(final Map value) { + final var result = new HashMap(); + if (value.get(OWNER_ID) instanceof String ownerId) { + result.put(OWNER_ID, ownerId); + } + if (value.get(LAST_MODIFIED_TIME) instanceof Number lastModified) { + result.put(LAST_MODIFIED_TIME, lastModified.longValue()); + } + if (value.get(ETAG) instanceof String etag) { + result.put(ETAG, etag); + } + if (result.isEmpty()) { + LOG.warn("Invalid checkpoint-value {}", value); + return Optional.empty(); + } + return Optional.of(new EventHubPartitionOwnershipOffset(unmodifiableMap(result))); + } + + String ownerId() { + if (value.get(OWNER_ID) instanceof String ownerId) { + return ownerId; + } + return null; + } + + Long lastModifiedTime() { + if (value.get(LAST_MODIFIED_TIME) instanceof Long lastModifiedTime) { + return lastModifiedTime; + } + return null; + } + + String eTag() { + if (value.get(ETAG) instanceof String eTag) { + return eTag; + } + return null; + } +} diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourceOffset.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourceOffset.java new file mode 100644 index 0000000000..7930c0d755 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourceOffset.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import static java.util.stream.Collectors.toUnmodifiableMap; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; + +/** + * The source offset of the Azure EventHub source connector. + * + * The source offset is partitioned by {@link EventHubSourcePartition}. + */ +record EventHubSourceOffset(Map checkpoints, + Map partitionOwnerships) { + private static final String CHECKPOINT_KEY = "checkpoint"; + private static final String OWNERSHIP_KEY = "ownership"; + + EventHubSourceOffset() { + this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>()); + } + + void load(Map offset) { + if (offset.get(CHECKPOINT_KEY) instanceof Map checkpointsOffsets) { + loadCheckpoints(checkpointsOffsets); + } + if (offset.get(OWNERSHIP_KEY) instanceof Map partitionOwnerships) { + loadPartitionOwnerships(partitionOwnerships); + } + } + + private void loadCheckpoints(Map checkpointsOffsets) { + checkpointsOffsets.forEach((k, v) -> { + if (k instanceof Map key && v instanceof Map value) { + EventHubConsumerInfo.tryParse(key) + .ifPresent(consumerInfo -> EventHubCheckpointOffset.tryParse(value) + .ifPresent(checkpointOffset -> checkpoints.put(consumerInfo, checkpointOffset))); + } + }); + } + + private void loadPartitionOwnerships(Map partitionOwnershipsOffsets) { + partitionOwnershipsOffsets.forEach((k, v) -> { + if (k instanceof Map key && v instanceof Map value) { + EventHubConsumerInfo.tryParse(key) + .ifPresent(consumerInfo -> EventHubPartitionOwnershipOffset.tryParse(value) + .ifPresent(partitionOwnershipOffset -> partitionOwnerships.put(consumerInfo, partitionOwnershipOffset))); + } + }); + } + + void putCheckpoint(Checkpoint checkpoint) { + checkpoints.put(EventHubConsumerInfo.of(checkpoint), EventHubCheckpointOffset.of(checkpoint)); + } + + Optional getCheckpoint(EventHubSourcePartition sourcePartition, EventHubConsumerInfo eventHubConsumerInfo) { + return Optional.ofNullable(checkpoints.get(eventHubConsumerInfo)) + .map(value -> toCheckpoint(sourcePartition, eventHubConsumerInfo, value)); + } + + void putPartitionOwnership(PartitionOwnership partitionOwnership) { + partitionOwnerships.put(EventHubConsumerInfo.of(partitionOwnership), EventHubPartitionOwnershipOffset.of(partitionOwnership)); + } + + Optional getPartitionOwnership(EventHubSourcePartition sourcePartition, EventHubConsumerInfo eventHubConsumerInfo) { + return Optional.ofNullable(partitionOwnerships.get(eventHubConsumerInfo)) + .map(value -> toPartitionOwnership(sourcePartition, eventHubConsumerInfo, value)); + } + + Map toMap() { + return Map.of( + CHECKPOINT_KEY, + checkpoints.entrySet().stream().collect(toUnmodifiableMap( + entry -> entry.getKey().value(), + entry -> entry.getValue().value())), + OWNERSHIP_KEY, + partitionOwnerships.entrySet().stream().collect(toUnmodifiableMap( + entry -> entry.getKey().value(), + entry -> entry.getValue().value()))); + } + + private static Checkpoint toCheckpoint(EventHubSourcePartition sourcePartition, + EventHubConsumerInfo eventHubConsumerInfo, + EventHubCheckpointOffset checkpointOffset) { + final var checkpoint = new Checkpoint(); + checkpoint.setEventHubName(eventHubConsumerInfo.eventHub()); + checkpoint.setFullyQualifiedNamespace(eventHubConsumerInfo.namespace()); + checkpoint.setConsumerGroup(eventHubConsumerInfo.consumerGroup()); + checkpoint.setPartitionId(sourcePartition.partitionId()); + checkpoint.setOffset(checkpointOffset.offset()); + checkpoint.setSequenceNumber(checkpointOffset.sequenceNumber()); + return checkpoint; + } + + private static PartitionOwnership toPartitionOwnership(EventHubSourcePartition sourcePartition, + EventHubConsumerInfo eventHubConsumerInfo, + EventHubPartitionOwnershipOffset partitionOwnershipOffset) { + final var partitionOwnership = new PartitionOwnership(); + partitionOwnership.setEventHubName(eventHubConsumerInfo.eventHub()); + partitionOwnership.setFullyQualifiedNamespace(eventHubConsumerInfo.namespace()); + partitionOwnership.setConsumerGroup(eventHubConsumerInfo.consumerGroup()); + partitionOwnership.setPartitionId(sourcePartition.partitionId()); + partitionOwnership.setOwnerId(partitionOwnershipOffset.ownerId()); + partitionOwnership.setLastModifiedTime(partitionOwnershipOffset.lastModifiedTime()); + partitionOwnership.setETag(partitionOwnershipOffset.eTag()); + return partitionOwnership; + } +} diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourcePartition.java b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourcePartition.java new file mode 100644 index 0000000000..733552dc28 --- /dev/null +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/azureeventhubssource/checkpoint/EventHubSourcePartition.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint; + +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.component.azure.eventhubs.EventHubsConstants; + +/** + * The source partition of Azure EventHub source connector. + * + * @param partitionId the id of the EventHub partition + */ +public record EventHubSourcePartition(String partitionId) { + private static final String PARTITION_ID = "partitionId"; + + /** + * Returns {@link EventHubSourcePartition} from the given {@link Exchange}. + * The partition id is extracted from the CamelAzureEventHubsPartitionId header. + * + * @param exchange the exchange for which source partition is returned + * @return {@link EventHubSourcePartition} + */ + public static EventHubSourcePartition of(Exchange exchange) { + return new EventHubSourcePartition(exchange.getMessage().getHeader(EventHubsConstants.PARTITION_ID, String.class)); + } + + /** + * Returns map representation of the source partition. + * + * @return {@link Map} + */ + public Map toMap() { + return Map.of(PARTITION_ID, partitionId); + } +} \ No newline at end of file diff --git a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/resources/kamelets/azure-eventhubs-source.kamelet.yaml b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/resources/kamelets/azure-eventhubs-source.kamelet.yaml index b9c5cfddd9..1088f8dc7d 100644 --- a/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/resources/kamelets/azure-eventhubs-source.kamelet.yaml +++ b/connectors/camel-azure-eventhubs-source-kafka-connector/src/main/resources/kamelets/azure-eventhubs-source.kamelet.yaml @@ -36,13 +36,10 @@ spec: There are two different mechanism of authentication `CONNECTION_STRING` and `AZURE_IDENTITY`, you could specify with credentialType property. If you're using `CONNECTION_STRING` sharedAccessName and sharedAccessKey properties will be needed. - In case of `AZURE_IDENTITY` selection, the DefaultAzureCredential will attempt to authenticate via the following mechanisms in the following order enviroment, Workload Identity, Managed Identity, Azure Developer CLI, IntelliJ, Azure CLI and Azure Powershell. + In case of `AZURE_IDENTITY` selection, the DefaultAzureCredential will attempt to authenticate via the following mechanisms in the following order enviroment, Workload Identity, Managed Identity, Azure Developer CLI, IntelliJ, Azure CLI and Azure Powershell. required: - namespaceName - eventhubName - - blobAccountName - - blobAccessKey - - blobContainerName type: object properties: namespaceName: @@ -100,9 +97,9 @@ spec: parameters: sharedAccessName: "{{?sharedAccessName}}" sharedAccessKey: "{{?sharedAccessKey}}" - blobAccountName: "{{blobAccountName}}" - blobAccessKey: "{{blobAccessKey}}" - blobContainerName: "{{blobContainerName}}" + blobAccountName: "{{?blobAccountName}}" + blobAccessKey: "{{?blobAccessKey}}" + blobContainerName: "{{?blobContainerName}}" credentialType: "{{credentialType}}" steps: - to: "kamelet:sink" \ No newline at end of file diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 9f60ab9878..a9e3027a0a 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -23,8 +23,10 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.PollingConsumer; @@ -72,7 +74,7 @@ public class CamelSourceTask extends SourceTask { private SpscArrayQueue freeSlots; private boolean mapProperties; private boolean mapHeaders; - + private Consumer endpointCustomizer; @Override public String version() { @@ -103,23 +105,23 @@ public void start(Map props) { final String componentSchema = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF); final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF); final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF); - final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); - final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); - final int maxRedeliveries = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); - final long redeliveryDelay = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); - final String errorHandler = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); - final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF); - final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); - final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); - final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); - final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); - final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); - final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); - final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); - final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); - final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); - mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); - mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF); + final int size = config.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); + final long timeout = config.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); + final int maxRedeliveries = config.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); + final long redeliveryDelay = config.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); + final String errorHandler = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); + final Boolean idempotencyEnabled = config.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF); + final String expressionType = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); + final String expressionHeader = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); + final int memoryDimension = config.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); + final String idempotentRepositoryType = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); + final String idempotentRepositoryKafkaTopic = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); + final String idempotentRepositoryBootstrapServers = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); + final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); + final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); + final String headersRemovePattern = config.getString(CamelConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); + mapProperties = config.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); + mapHeaders = config.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -172,8 +174,11 @@ public Integer get() { .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration) .withHeadersExcludePattern(headersRemovePattern) .build(camelContext); - - consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); + Endpoint endpoint = cms.getCamelContext().getEndpoint(localUrl); + if (endpointCustomizer != null) { + endpointCustomizer.accept(endpoint); + } + consumer = endpoint.createPollingConsumer(); consumer.start(); cms.start(); @@ -184,11 +189,12 @@ public Integer get() { } } + protected String getSourceKamelet() { return DEFAULT_KAMELET_CKC_SOURCE; } - private long remaining(long startPollEpochMilli, long maxPollDuration) { + private static long remaining(long startPollEpochMilli, long maxPollDuration) { return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); } @@ -211,10 +217,8 @@ public synchronized List poll() { LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); - // TODO: see if there is a better way to use sourcePartition - // an sourceOffset - Map sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); - Map sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); + Map sourcePartition = toSourcePartition(exchange); + Map sourceOffset = toSourceOffset(exchange); final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; Object messageBodyValue = exchange.getMessage().getBody(); @@ -225,10 +229,9 @@ public synchronized List poll() { final long timestamp = calculateTimestamp(exchange); // take in account Cached camel streams - if (messageBodyValue instanceof StreamCache) { - StreamCache sc = (StreamCache) messageBodyValue; + if (messageBodyValue instanceof StreamCache streamCache) { // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs) - sc.reset(); + streamCache.reset(); } for (String singleTopic : topics) { CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, @@ -239,7 +242,7 @@ public synchronized List poll() { setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); } } - + if (mapProperties) { if (exchange.hasProperties()) { setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); @@ -330,49 +333,44 @@ protected long calculateTimestamp(Exchange exchange) { return System.currentTimeMillis(); } - private void setAdditionalHeaders(SourceRecord record, Map map, String prefix) { + private static void setAdditionalHeaders(SourceRecord record, Map map, String prefix) { for (Map.Entry entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); String keyCamelHeader = prefix + key; - if (value instanceof String) { - record.headers().addString(keyCamelHeader, (String)value); - } else if (value instanceof Boolean) { - record.headers().addBoolean(keyCamelHeader, (boolean)value); - } else if (value instanceof Byte) { - record.headers().addByte(keyCamelHeader, (byte)value); - } else if (value instanceof Byte[]) { - final Byte[] array = (Byte[])value; - final byte[] bytes = new byte[array.length]; - - for (int i = 0; i < array.length; i++) { - bytes[i] = array[i]; - } - + if (value instanceof String stringValue) { + record.headers().addString(keyCamelHeader, stringValue); + } else if (value instanceof Boolean booleanValue) { + record.headers().addBoolean(keyCamelHeader, booleanValue); + } else if (value instanceof Byte byteValue) { + record.headers().addByte(keyCamelHeader, byteValue); + } else if (value instanceof Byte[] byteArray) { + final byte[] bytes = new byte[byteArray.length]; + System.arraycopy(byteArray, 0, bytes, 0, bytes.length); record.headers().addBytes(keyCamelHeader, bytes); - } else if (value instanceof Date) { - record.headers().addTimestamp(keyCamelHeader, (Date)value); - } else if (value instanceof BigDecimal) { + } else if (value instanceof Date dateValue) { + record.headers().addTimestamp(keyCamelHeader, dateValue); + } else if (value instanceof BigDecimal decimalValue) { //XXX: kafka connect configured header converter takes care of the encoding, //default: org.apache.kafka.connect.storage.SimpleHeaderConverter - record.headers().addDecimal(keyCamelHeader, (BigDecimal)value); - } else if (value instanceof Double) { - record.headers().addDouble(keyCamelHeader, (double)value); - } else if (value instanceof Float) { - record.headers().addFloat(keyCamelHeader, (float)value); - } else if (value instanceof Integer) { - record.headers().addInt(keyCamelHeader, (int)value); - } else if (value instanceof Long) { - record.headers().addLong(keyCamelHeader, (long)value); - } else if (value instanceof Short) { - record.headers().addShort(keyCamelHeader, (short)value); + record.headers().addDecimal(keyCamelHeader, decimalValue); + } else if (value instanceof Double doubleValue) { + record.headers().addDouble(keyCamelHeader, doubleValue); + } else if (value instanceof Float floatValue) { + record.headers().addFloat(keyCamelHeader, floatValue); + } else if (value instanceof Integer intValue) { + record.headers().addInt(keyCamelHeader, intValue); + } else if (value instanceof Long longValue) { + record.headers().addLong(keyCamelHeader, longValue); + } else if (value instanceof Short shortValue) { + record.headers().addShort(keyCamelHeader, shortValue); } } } - private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) { + private static String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) { return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull; } @@ -388,4 +386,16 @@ public LoggingLevel getLoggingLevel() { public void setLoggingLevel(LoggingLevel loggingLevel) { this.loggingLevel = loggingLevel; } + + protected void setEndpointCustomizer(Consumer endpointCustomizer) { + this.endpointCustomizer = endpointCustomizer; + } + + protected Map toSourcePartition(Exchange exchange) { + return Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); + } + + protected Map toSourceOffset(Exchange exchange) { + return Collections.singletonMap("position", exchange.getExchangeId()); + } }