Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import static io.quarkus.smallrye.reactivemessaging.kafka.deployment.SmallRyeReactiveMessagingKafkaProcessor.getChannelPropertyKey;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelPropertyName;

import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -60,7 +60,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> channelsManagedB

String channelType = incoming ? "incoming" : "outgoing";
return isKafkaConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> {
String connectorKey = getChannelPropertyKey(channelName, "connector", incoming);
String connectorKey = getChannelPropertyName(channelName, "connector", incoming);
String connector = getConfig()
.getOptionalValue(connectorKey, String.class)
.orElse("ignored");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore.HIBERNATE_ORM_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore.REDIS_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelIncomingPropertyName;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelOutgoingPropertyName;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelPropertyName;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -97,7 +100,7 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) {
}

static boolean hasDLQConfig(String channelName, Config config) {
String propertyKey = getChannelPropertyKey(channelName, "failure-strategy", true);
String propertyKey = getChannelIncomingPropertyName(channelName, "failure-strategy");
Optional<String> channelFailureStrategy = config.getOptionalValue(propertyKey, String.class);
Optional<String> failureStrategy = channelFailureStrategy.or(() -> getConnectorProperty("failure-strategy", config));

Expand All @@ -121,16 +124,6 @@ private static List<String> getChannelProperties(String keySuffix, Config config
return values;
}

static String channelPropertyFormat = "mp.messaging.%s.%s.%s";

static String getChannelPropertyKey(String channelName, String propertyName, boolean incoming) {
if ((channelName.charAt(0) != '"' || channelName.charAt(channelName.length() - 1) != '"')
&& channelName.contains(".")) {
channelName = "\"" + channelName + "\"";
}
return String.format(channelPropertyFormat, incoming ? "incoming" : "outgoing", channelName, propertyName);
}

@BuildStep
public void checkpointRedis(BuildProducer<AdditionalBeanBuildItem> additionalBean,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
Expand Down Expand Up @@ -219,7 +212,7 @@ void disableGracefulShutdown(List<ConnectorManagedChannelBuildItem> channelsMana
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, incoming, channelName)) {
continue;
}
String key = getChannelPropertyKey(channelName, incoming ? "graceful-shutdown" : "close-timeout", incoming);
String key = getChannelPropertyName(channelName, incoming ? "graceful-shutdown" : "close-timeout", incoming);
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, incoming ? "false" : "0"));
});
Expand Down Expand Up @@ -259,9 +252,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type outgoingType = getOutgoingTypeFromMethod(method);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.serializer", false), keySerializer);
getChannelOutgoingPropertyName(channelName, "key.serializer"), keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "value.serializer", false), valueSerializer);
getChannelOutgoingPropertyName(channelName, "value.serializer"), valueSerializer);

handleAdditionalProperties(channelName, false, discovery, config, keySerializer, valueSerializer);
}, generatedClass, reflection, alreadyGeneratedSerializers);
Expand Down Expand Up @@ -291,9 +284,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type replyType = injectionPointType.asParameterizedType().arguments().get(1);
processOutgoingType(discovery, requestType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.serializer", false), keySerializer);
getChannelOutgoingPropertyName(channelName, "key.serializer"), keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "value.serializer", false), valueSerializer);
getChannelOutgoingPropertyName(channelName, "value.serializer"), valueSerializer);
}, generatedClass, reflection, alreadyGeneratedSerializers);
extractKeyValueType(replyType, (key, value, isBatchType) -> {
Result keyDeserializer = deserializerFor(discovery, key, true, channelName, generatedClass, reflection,
Expand All @@ -302,18 +295,18 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "reply.key.deserializer", false), keyDeserializer);
getChannelOutgoingPropertyName(channelName, "reply.key.deserializer"), keyDeserializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "reply.value.deserializer", false), valueDeserializer);
getChannelOutgoingPropertyName(channelName, "reply.value.deserializer"), valueDeserializer);
handleAdditionalProperties(channelName, false, discovery, config, keyDeserializer, valueDeserializer);
});
} else {
Type outgoingType = getOutgoingTypeFromChannelInjectionPoint(injectionPointType);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.serializer", false), keySerializer);
getChannelOutgoingPropertyName(channelName, "key.serializer"), keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "value.serializer", false), valueSerializer);
getChannelOutgoingPropertyName(channelName, "value.serializer"), valueSerializer);

handleAdditionalProperties(channelName, false, discovery, config, keySerializer, valueSerializer);
}, generatedClass, reflection, alreadyGeneratedSerializers);
Expand All @@ -324,9 +317,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, String channelName, Type injectionPointType) {
if (injectionPointType != null && isKafkaTransactionsEmitter(injectionPointType)) {
String transactionalIdKey = getChannelPropertyKey(channelName, "transactional.id", false);
String enableIdempotenceKey = getChannelPropertyKey(channelName, "enable.idempotence", false);
String acksKey = getChannelPropertyKey(channelName, "acks", false);
String transactionalIdKey = getChannelOutgoingPropertyName(channelName, "transactional.id");
String enableIdempotenceKey = getChannelOutgoingPropertyName(channelName, "enable.idempotence");
String acksKey = getChannelOutgoingPropertyName(channelName, "acks");
LOGGER.infof("Transactional producer detected for channel '%s', setting following default config values: "
+ "'" + transactionalIdKey + "=${quarkus.application.name}-${channelName}', "
+ "'" + enableIdempotenceKey + "=true', "
Expand All @@ -349,12 +342,12 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer);
getChannelIncomingPropertyName(channelName, "key.deserializer"), keyDeserializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "value.deserializer", true), valueDeserializer);
getChannelIncomingPropertyName(channelName, "value.deserializer"), valueDeserializer);
if (Boolean.TRUE.equals(isBatchType)) {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "batch", true), "true");
getChannelIncomingPropertyName(channelName, "batch"), "true");
}

handleAdditionalProperties(channelName, true, discovery, config, keyDeserializer, valueDeserializer);
Expand Down Expand Up @@ -385,7 +378,7 @@ private void handleAdditionalProperties(String channelName, boolean incoming, De
}

result.additionalProperties.forEach((key, value) -> {
String configKey = getChannelPropertyKey(channelName, key, incoming);
String configKey = getChannelPropertyName(channelName, key, incoming);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, configKey, value);
});
}
Expand Down Expand Up @@ -1101,7 +1094,7 @@ private void processAnnotationsForReflectiveClassPayload(IndexView index, Config
}

private boolean isSerdeJson(IndexView index, Config config, String channelName, boolean serializer, boolean isKey) {
String configKey = getChannelPropertyKey(channelName, (isKey ? "key" : "value") + "." +
String configKey = getChannelPropertyName(channelName, (isKey ? "key" : "value") + "." +
(serializer ? "serializer" : "deserializer"), !serializer);
ConfigValue configValue = config.getConfigValue(configKey);
if (configValue.getValue() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.pulsar.deployment;

import static io.quarkus.smallrye.reactivemessaging.deployment.SmallRyeReactiveMessagingProcessor.getChannelPropertyKey;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelPropertyName;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -52,7 +52,7 @@ boolean isPulsarConnector(List<ConnectorManagedChannelBuildItem> channelsManaged

String channelType = incoming ? "incoming" : "outgoing";
return isPulsarConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> {
String connectorKey = getChannelPropertyKey(channelName, "connector", incoming);
String connectorKey = getChannelPropertyName(channelName, "connector", incoming);
String connector = getConfig()
.getOptionalValue(connectorKey, String.class)
.orElse("ignored");
Expand Down Expand Up @@ -108,7 +108,7 @@ boolean isProtobufGenerated(DotName className) {
}

boolean hasObjectMapperConfigSchema(Type type, String channelName, boolean incoming) {
String key = getChannelPropertyKey(channelName, "schema", incoming);
String key = getChannelPropertyName(channelName, "schema", incoming);
Optional<String> schema = getConfig().getOptionalValue(key, String.class);
return schema.isPresent() && schema.get().equals(SyntheticBeanBuilder.objectMapperSchemaId(type));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.pulsar.deployment;

import static io.quarkus.smallrye.reactivemessaging.deployment.SmallRyeReactiveMessagingProcessor.getChannelPropertyKey;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelIncomingPropertyName;
import static io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration.getChannelOutgoingPropertyName;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,16 +103,12 @@ void discoverDefaultSerdeConfig(DefaultSchemaDiscoveryState discovery,
}
}

private static String outgoingSchemaKey(String channelName) {
return getChannelPropertyKey(channelName, "schema", false);
}

private void processPulsarTransactions(DefaultSchemaDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config,
String channelName,
Type injectionPointType) {
if (injectionPointType != null && isPulsarEmitter(injectionPointType)) {
String enableTransactionKey = getChannelPropertyKey(channelName, "enableTransaction", false);
String enableTransactionKey = getChannelOutgoingPropertyName(channelName, "enableTransaction");
log.infof("Transactional producer detected for channel '%s', setting following default config values: "
+ "'" + enableTransactionKey + "=true'", channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, enableTransactionKey, "true");
Expand All @@ -129,20 +126,17 @@ private void processIncomingType(DefaultSchemaDiscoveryState discovery,
objectMapperSchemaFor(SyntheticBeanBuilder.objectMapperSchemaId(value), value, syntheticBean);
} else {
String schema = schemaFor(discovery, value, syntheticBean);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, incomingSchemaKey(channelName), schema);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelIncomingPropertyName(channelName, "schema"), schema);
}
}
if (Boolean.TRUE.equals(isBatch)) {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "batchReceive", true), "true");
getChannelIncomingPropertyName(channelName, "batchReceive"), "true");
}
});
}

private static String incomingSchemaKey(String channelName) {
return getChannelPropertyKey(channelName, "schema", true);
}

private Type getInjectionPointType(AnnotationInstance annotation) {
return switch (annotation.target().kind()) {
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
Expand Down Expand Up @@ -318,7 +312,8 @@ private void processOutgoingType(DefaultSchemaDiscoveryState discovery,
objectMapperSchemaFor(SyntheticBeanBuilder.objectMapperSchemaId(value), value, syntheticBean);
} else {
String schema = schemaFor(discovery, value, syntheticBean);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, outgoingSchemaKey(channelName), schema);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelOutgoingPropertyName(channelName, "schema"), schema);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ public class SmallRyeReactiveMessagingProcessor {
static final String DEFAULT_VIRTUAL_THREADS_MAX_CONCURRENCY = "1024";
static final String INVOKER_SUFFIX = "_SmallRyeMessagingInvoker";

static String channelPropertyFormat = "mp.messaging.%s.%s.%s";

public static String getChannelPropertyKey(String channelName, String propertyName, boolean incoming) {
return String.format(channelPropertyFormat, incoming ? "incoming" : "outgoing",
channelName.contains(".") ? "\"" + channelName + "\"" : channelName, propertyName);
}

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.MESSAGING);
Expand Down
Loading
Loading