Skip to content

Messaging

Stetson Robinson edited this page Apr 10, 2020 · 5 revisions

IMPORTANT: This Kogito wiki is deprecated. For the latest Kogito documentation, see the Kogito documentation page. To contribute to Kogito documentation, see the master-kogito branch of the kie-docs repository in GitHub.

Messaging that can be both input and output of the process execution.

Message based process image

This is realised by message start and end events that require

  • message name which maps to the actual channel that delivers messages

  • message payload which maps to a process instance variable

Message definition

Configuration of the marshalling is dependant on the actual messaging solution being used.

Quarkus

Apache Kafka Configuration

Complete project can be used as source reference

Install and configure Apache Kafka

Install Apache Kafka in the preferred way and create new topics needed

Add dependency

Add following into the pom.xml of the application.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Configure messaging channel

Edit application.properties file located under src/main/resources and add following entries

Add following for message start event

mp.messaging.incoming.travellers.connector=smallrye-kafka
mp.messaging.incoming.travellers.topic=travellers
mp.messaging.incoming.travellers.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

where .travellers. is the name that matches the message name of the message start event.

Add following for message end event

mp.messaging.outgoing.processedtravellers.connector=smallrye-kafka
mp.messaging.outgoing.processedtravellers.topic=processedtravellers
mp.messaging.outgoing.processedtravellers.value.serializer=org.apache.kafka.common.serialization.StringSerializer

where .processedtravellers. is the name that matches the message name of the message end event.

Under heavy load for producer there might be error thrown in case the back pressure is overloaded. Use following property to disable waiting for completion which should make the overall throughput higher

mp.messaging.outgoing.[channel-name].waitForWriteCompletion=false

SpringBoot

Apache Kafka Configuration

Complete project can be used as source reference

Install and configure Apache Kafka

Install Apache Kafka in the preferred way and create new topics needed.

Add dependency

Add following into the pom.xml of the application.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

Configure messaging channel

Edit application.properties file located under src/main/resources and add following entries

Add following for message start event

kafka.bootstrapAddress=localhost:9092

Create beans for incoming messages

Following beans should be created to support incoming messages from Kafka to start process instances

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "travellers-group");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Create beans for outgoing messages

Following beans should be created to support incoming messages from Kafka to start process instances

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}