Karate-Kafka adds first-class support for testing Kafka for both producing and consuming sides. Additional keywords make it easy to produce Kafka messages the same way you are used to making HTTP requests. The challenge of consuming messages in async fashion is solved via an elegant API.
- Unified syntax similar to HTTP but focused on Kafka
- Flexibility to set up multiple async listeners
- Mix HTTP and Kafka calls within the same test flow
- Support for parallel execution
- Support for performance testing
- Express data as JSON and leverage Karate's powerful assertions
- Avro, Protobuf or plain JSON serialization support
- Use Avro or Protobuf schemas directly, no code-generation required
- Kafka schema registry is optional, use schemas directly from files
- Support for SSL/TLS and using certificates for secure auth
- Includes an explanation and demo, you can watch it on YouTube.
To run Karate tests using this library, you need a license from Karate labs. You can email [email protected] and request a license.
To develop and run feature files from the IDE you need to upgrade to the paid versions of Karate Labs official plugins for IntelliJ or VS Code.
You need a Maven or Gradle project. Please use the latest available version. The dependency info can be found here: https://central.sonatype.com/artifact/io.karatelabs/karate-kafka
The karate.lic
file you receive should be placed in a .karate
folder in your project root. You can also change the default path where the license is expected - by setting a KARATE_LICENSE_PATH
environment property.
You can find a sample project here: Karate Kafka Example
Note that variables and JSON embedded expressions will work just like you expect in Karate.
Any valid Kafka configuration can be set this way. For example:
* configure kafka =
"""
{
'bootstrap.servers': 'localhost:29092',
}
"""
For an example of configuring MTLS / SSL, refer: kafka-mtls-example.
Set up mappings from JSON to Avro or Protobuf if needed. For example:
* register { name: 'hello', path: 'classpath:karate/hello.avsc' }
The name you give here can be referenced later in session.schema
and the schema
keyword.
For Protobuf, you need to also specify the message name as a *.proto
file can have multiple definitions.
* register { name: 'hello-proto', path: 'classpath:karate/hello.proto', message: 'Hello' }
If your protobuf files import other files, you can specify "search roots" as follows:
* register { name: 'hello-proto', path: 'classpath:karate/hello.proto', message: 'Hello', roots: ['classpath:karate'] }
When producing, refer to a previously registered schema.
When producing, set one Kafka header at a time
For example:
* header foo = 'bar'
When producing, set all Kafka headers in one shot.
For example:
* headers { foo: 'bar1', baz: 'ban1' }
When producing, set the Kafka message key
Example:
* key 'first'
When producing, set the Kafka message value. If you specify a schema
the JSON will be converted to Avro automatically.
Example:
* value { message: 'hello', info: { first: 1, second: true } }
You can also use the multi-line "docstring" syntax.
* value
"""
{
"meta": {
"metaId": "123",
"metaType": "AAA",
"metaChildren": [{ "name": "foo", "status": "ONE" }]
},
"payload": {
"payloadId": "456",
"payloadType": null,
"payloadEnum": "FIRST",
"payloadChild": {"field1": "foo", "field2": "bar"}
}
}
Async handling requires a little more complexity than simple API tests, but karate-kafka
still keeps it simple. Here is an example:
* def session = karate.consume('kafka')
* session.topic = 'test-topic'
* session.start()
Note how the syntax is future-proof, and support for other async protocols such as grpc
and websocket
is very similar.
Typically you name the returned variable from karate.consume()
as session
. Now you can set properties before calling session.start()
.
Behind the scenes a new Kafka consumer with a fresh group-id is created. Please provide feedback if you need a different model for your environment.
Set the topic.
Defaults to 1. This is how you tell Karate how many messages to wait for when consuming.
When consuming, refer to a previously registered schema.
Optional way to filter for only some kinds of messages to collect.
You can use JS functions and be very dynamic. For example:
* session.filter = x => x.key != 'zero'
You have to call this to start the listener process. To complete the test flow, you have to call session.collect()
Since Kafka and async listeners can span or "collect" multiple messages, this is always an array. Within each object you can unpack the key
, offset
, headers
and value
. Everything is JSON just like you expect in Karate.
Here is a simple example that sends plain JSON (serialized to bytes) and listens on the same topic.
Feature: karate-kafka demo
Background:
* configure kafka =
"""
{
'bootstrap.servers': '127.0.0.1:29092'
}
"""
Scenario:
* def session = karate.consume('kafka')
* session.topic = 'test-topic'
* session.count = 1
* session.start()
* topic 'test-topic'
* key 'first'
* value { message: 'hello', info: { first: 1, second: true } }
* produce kafka
* def response = session.collect()
* match response[0].key == 'first'
* match response[0].value == { message: 'hello', info: { first: 1, second: true } }