diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java index 95a23dd85..134da31cc 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java @@ -115,7 +115,6 @@ public C createRegisteredContainer(PulsarListenerEndpoint endpoint) { endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance, endpoint); - // customizeContainer(instance); return instance; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index ce06c3579..c3836768d 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -37,6 +37,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Vedran Pavic + * @author Daniel Szabo */ public class ConcurrentPulsarListenerContainerFactory extends AbstractPulsarListenerContainerFactory, T> { @@ -80,6 +81,7 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu var containerProps = new PulsarContainerProperties(); // Map factory props (defaults) to the container props + containerProps.setConsumerTaskExecutor(factoryProps.getConsumerTaskExecutor()); containerProps.setSchemaResolver(factoryProps.getSchemaResolver()); containerProps.setTopicResolver(factoryProps.getTopicResolver()); containerProps.setSubscriptionType(factoryProps.getSubscriptionType()); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java index b5dfcd450..b09562799 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -142,4 +143,26 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { } + @Nested + class ConsumerTaskExecutor { + + @Test + @SuppressWarnings("unchecked") + void factoryValueCopiedWhenCreatingContainer() { + final var factoryProps = new PulsarContainerProperties(); + factoryProps.setConsumerTaskExecutor(new SimpleAsyncTaskExecutor()); + final var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + final var endpoint = mock(PulsarListenerEndpoint.class); + // Mockito by default returns 0 for Integer + when(endpoint.getConcurrency()).thenReturn(null); + + final var createdContainer = containerFactory.createRegisteredContainer(endpoint); + + final var containerProperties = createdContainer.getContainerProperties(); + assertThat(containerProperties.getConsumerTaskExecutor()).isEqualTo(factoryProps.getConsumerTaskExecutor()); + } + + } + }