Skip to content

Commit

Permalink
apache#1662 Implementation of the EventHubOffsetCheckpointStore
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubmalek committed Aug 9, 2024
1 parent 2003ae6 commit 4b5bfaa
Show file tree
Hide file tree
Showing 11 changed files with 688 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
"camel.kamelet.azure-eventhubs-source.blobAccountName": {
"name": "camel.kamelet.azure-eventhubs-source.blobAccountName",
"description": "The name of the Storage Blob account.",
"priority": "HIGH",
"priority": "MEDIUM",
"required": "true"
},
"camel.kamelet.azure-eventhubs-source.blobContainerName": {
"name": "camel.kamelet.azure-eventhubs-source.blobContainerName",
"description": "The name of the Storage Blob container.",
"priority": "HIGH",
"priority": "MEDIUM",
"required": "true"
},
"camel.kamelet.azure-eventhubs-source.blobAccessKey": {
"name": "camel.kamelet.azure-eventhubs-source.blobAccessKey",
"description": "The key for the Azure Storage Blob service that is associated with the Storage Blob account name.",
"priority": "HIGH",
"priority": "MEDIUM",
"required": "true"
},
"camel.kamelet.azure-eventhubs-source.credentialType": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public static ConfigDef conf() {
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_EVENTHUB_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_SHARED_ACCESS_KEY_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF, ConfigDef.Type.PASSWORD, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_DOC);
conf.define(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_CREDENTIAL_TYPE_DOC);
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,86 @@
*/
package org.apache.camel.kafkaconnector.azureeventhubssource;

import java.util.HashMap;
import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF;
import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF;
import static org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnectorConfig.CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF;

import java.util.Map;
import javax.annotation.Generated;
import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;

import org.apache.camel.Exchange;
import org.apache.camel.component.azure.eventhubs.EventHubsEndpoint;
import org.apache.camel.kafkaconnector.CamelSourceTask;
import org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubOffsetCheckpointStore;
import org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubSourcePartition;
import org.apache.kafka.common.config.ConfigException;


@Generated("This class has been generated by camel-kafka-connector-generator-maven-plugin, remove this annotation to prevent it from being generated.")
public class CamelAzureeventhubssourceSourceTask extends CamelSourceTask {
private EventHubOffsetCheckpointStore checkpointStore;

public CamelAzureeventhubssourceSourceTask() {
setEndpointCustomizer(endpoint -> {
if (checkpointStore != null && endpoint instanceof EventHubsEndpoint eventHubsEndpoint) {
eventHubsEndpoint.getConfiguration().setCheckpointStore(checkpointStore);
}
});
}

@Override
protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(
Map<String, String> props) {
protected CamelAzureeventhubssourceSourceConnectorConfig getCamelSourceConnectorConfig(Map<String, String> props) {
return new CamelAzureeventhubssourceSourceConnectorConfig(props);
}

@Override
public void start(Map<String, String> properties) {
final var config = getCamelSourceConnectorConfig(properties);
if (!isBlobCheckpointStorageEnabled(config)) {
checkpointStore = new EventHubOffsetCheckpointStore(context.offsetStorageReader());
}
super.start(properties);
}

@Override
protected String getSourceKamelet() {
return "kamelet:azure-eventhubs-source";
}
}

@Override
protected Map<String, ?> toSourcePartition(Exchange exchange) {
if (checkpointStore != null) {
return EventHubSourcePartition.of(exchange).toMap();
}
return super.toSourcePartition(exchange);
}

@Override
protected Map<String, ?> toSourceOffset(Exchange exchange) {
if (checkpointStore != null) {
return checkpointStore.getSourceOffset(exchange);
}
return super.toSourceOffset(exchange);
}

private static boolean isBlobCheckpointStorageEnabled(CamelAzureeventhubssourceSourceConnectorConfig config) {
final var blobAccountName = config.getString(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF);
final var blobContainerName = config.getPassword(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF);
final var blobAccessKey = config.getPassword(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF);
if (blobAccountName == null && blobContainerName == null && blobAccessKey == null) {
return false;
}
if (blobAccountName == null) {
throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCOUNT_NAME_CONF);
}
if (blobContainerName == null) {
throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_CONTAINER_NAME_CONF);
}
if (blobAccessKey == null) {
throw incompleteBlobStorageConfigExeception(CAMEL_SOURCE_AZUREEVENTHUBSSOURCE_KAMELET_BLOB_ACCESS_KEY_CONF);
}
return true;
}

private static ConfigException incompleteBlobStorageConfigExeception(String missingConfigName) {
throw new ConfigException(missingConfigName, null, "Incomplete Azure Blob Storage configuration");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint;

import static java.util.Collections.unmodifiableMap;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.messaging.eventhubs.models.Checkpoint;

/**
* The source offset information about the Azure EventHub checkpoint.
*/
record EventHubCheckpointOffset(Map<String, Object> value) {
private static final Logger LOG = LoggerFactory.getLogger(EventHubOffsetCheckpointStore.class);

private static final String OFFSET = "offset";
private static final String SEQUENCE_NUMBER = "sequenceNo";

static EventHubCheckpointOffset of(Checkpoint checkpoint) {
var value = new HashMap<String, Object>();
Optional.ofNullable(checkpoint.getOffset()).ifPresent(it -> value.put(OFFSET, it));
Optional.ofNullable(checkpoint.getSequenceNumber()).ifPresent(it -> value.put(SEQUENCE_NUMBER, it));
return new EventHubCheckpointOffset(unmodifiableMap(value));
}

static Optional<EventHubCheckpointOffset> tryParse(final Map<?, ?> value) {
final var result = new HashMap<String, Object>();
if (value.get(OFFSET) instanceof Number offset) {
result.put(OFFSET, offset.longValue());
}
if (value.get(SEQUENCE_NUMBER) instanceof Number sequenceNumber) {
result.put(SEQUENCE_NUMBER, sequenceNumber.longValue());
}
if (result.isEmpty()) {
LOG.warn("Invalid checkpoint-value {}", value);
return Optional.empty();
}
return Optional.of(new EventHubCheckpointOffset(unmodifiableMap(result)));
}

Long offset() {
if (value.get(OFFSET) instanceof Long offset) {
return offset;
}
return null;
}

Long sequenceNumber() {
if (value.get(SEQUENCE_NUMBER) instanceof Long sequenceNumber) {
return sequenceNumber;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint;

import static java.util.Collections.unmodifiableMap;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;

/**
* The source offset information about the Azure EventHub consumer.
*/
record EventHubConsumerInfo(Map<String, String> value) {
private static final Logger LOG = LoggerFactory.getLogger(EventHubOffsetCheckpointStore.class);

private static final String NAMESPACE = "namespace";
private static final String EVENT_HUB = "eventHub";
private static final String CONSUMER_GROUP = "consumerGroup";

static EventHubConsumerInfo of(String namespace, String eventHub, String consumerGroup) {
var value = new HashMap<String, String>();
Optional.ofNullable(namespace).ifPresent(it -> value.put(NAMESPACE, it));
Optional.ofNullable(eventHub).ifPresent(it -> value.put(EVENT_HUB, it));
Optional.ofNullable(consumerGroup).ifPresent(it -> value.put(CONSUMER_GROUP, it));
return new EventHubConsumerInfo(unmodifiableMap(value));
}

static EventHubConsumerInfo of(Checkpoint checkpoint) {
return of(checkpoint.getFullyQualifiedNamespace(),
checkpoint.getEventHubName(),
checkpoint.getConsumerGroup());
}

static EventHubConsumerInfo of(PartitionOwnership partitionOwnership) {
return of(partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(),
partitionOwnership.getConsumerGroup());
}

static Optional<EventHubConsumerInfo> tryParse(Map<?, ?> value) {
String fullyQualifiedNamespace = null;
String eventHubName = null;
String consumerGroup = null;
if (value.get(NAMESPACE) instanceof String it) {
fullyQualifiedNamespace = it;
}
if (value.get(NAMESPACE) instanceof String it) {
eventHubName = it;
}
if (value.get(CONSUMER_GROUP) instanceof String it) {
consumerGroup = it;
}
if (fullyQualifiedNamespace == null && eventHubName == null && consumerGroup == null) {
LOG.warn("Invalid offset-key {}", value);
return Optional.empty();
}
return Optional.of(of(fullyQualifiedNamespace, eventHubName, consumerGroup));
}

String namespace() {
return value.get(NAMESPACE);
}

String eventHub() {
return value.get(EVENT_HUB);
}

String consumerGroup() {
return value.get(CONSUMER_GROUP);
}
}
Loading

0 comments on commit 4b5bfaa

Please sign in to comment.