This is a dynamic Kafka Streams routing service built with Spring Boot, which:
- consumes JSON messages,
- dynamically filters them based on fields provided in REST API
- Routes filtered records to corresponding output topics
- Unmatched or malformed messages are optionally routed to a others-topic.
- Streams are started and stopped dynamically via REST endpoints and monitored by /status api
- Dynamic stream routing using
BranchedKStream - JSON message support with custom
JsonNodeSerde - External configuration via
application.properties - Graceful error handling
- REST APIs to control the stream lifecycle
- Spring Boot integration with Kafka Streams
- Docker support
| Endpoint | Method | Description |
|---|---|---|
/streams/start |
POST | Starts a new Kafka Streams topology based on request body |
/streams/stop |
POST | Stops the currently running Kafka Streams topology |
/streams/status |
GET | Returns whether the stream is currently running or stopped |
json
{
"inputTopic": "input-topic",
"outputs": [
{ "filter": "employeeId", "outputTopic": "employees-topic" },
{ "filter": "userId", "outputTopic": "users-topic" }
]
}
- Any JSON key present in the filter field will be matched.
- Records that don’t match any branch will be sent to others topic.
src/
├── main/java/com/start/kafkastreams/
│ ├── JsonNodeSerde.java # Custom serializer/deserializer
│ ├── KafkaStreamsService.java # Builds & runs the dynamic topology
│ ├── StreamBaseConfig.java # Kafka Streams configuration bean
│ └── DynamicStreamController.java # REST controller to trigger stream
| └── KafkastreamApplication.java # SpringBootApplication
└── StreamStartRequest.java # POJO for /start API's request-body
└── resources/
└── application.properties # Kafka & stream configs
- Kafka installed and running with expected input and output topics configured
- Java installed and configured
- Docker installed
./mvnw clean package
This will generate a JAR file in the target/ directory, e.g., kafkastreams.jar.
FROM openjdk:17-jdk-slim
WORKDIR /app
COPY target/kafkastreams.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]
✅ Note: Place your application.properties file inside a config/ folder at the project root.
docker build -t kafkastreams-dynamic-router .
docker run -d \
-p 8085:8085 \
-v $(pwd)/config/application.properties:/app/config/application.properties \
kafkastreams-router
Note: external application.properties is used
- Java 17+
- Spring Boot 3.x
- Apache Kafka 3.6+
- Kafka Streams API
- Jackson for JSON (JsonNode)
- Docker
For further reference, please consider the following sections:
