Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mockito = "5.23.0"
testcontainers = "1.21.4"

[libraries]
kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version = "1.4.0" }
kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version = "1.5.0" }
kafka-clients = { group = "org.apache.kafka", name = "kafka-clients" }
kafka-server-common = { group = "org.apache.kafka", name = "kafka-server-common" }
kafka-core = { group = "org.apache.kafka", name = "kafka_2.13" }
Expand All @@ -31,7 +31,7 @@ mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version
testcontainers-junit = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" }
testcontainers-localstack = { group = "org.testcontainers", name = "localstack", version.ref = "testcontainers" }
testcontainers-azure = { group = "org.testcontainers", name = "azure", version.ref = "testcontainers" }
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.1" }
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.6.0" }
log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.4" }

[plugins]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

final class HeaderLargeMessagePayloadProtocol implements LargeMessagePayloadProtocol {
static final String HEADER_PREFIX = "__" + PREFIX + "backed.";
private static final String KEY_HEADER = HEADER_PREFIX + "key";
private static final String VALUE_HEADER = HEADER_PREFIX + "value";
static final String KEY_HEADER = HEADER_PREFIX + "key";
static final String VALUE_HEADER = HEADER_PREFIX + "value";

static boolean usesHeaders(final Headers headers, final boolean isKey) {
return headers.lastHeader(getHeaderName(isKey)) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName;

import java.util.Map;
import java.util.Objects;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -76,11 +74,7 @@ public T deserialize(final String topic, final Headers headers, final byte[] dat
Objects.requireNonNull(this.deserializer);
Objects.requireNonNull(this.client);
final byte[] bytes = this.client.retrieveBytes(data, headers, this.isKey);
final T deserialized = this.deserializer.deserialize(topic, headers, bytes);
// remove all headers associated with large message because the record might be serialized with different flags
headers.remove(getHeaderName(this.isKey));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not happy about this change but it is necessary in order to make dlq work. dlq uses the most recent headers but the original bytes. Thus, the headers would be unavailable in dlq topic and we cannot deserialize the message. However, we would keep this headers for all down stream messages forever

headers.remove(CompressionType.HEADER_NAME);
return deserialized;
return this.deserializer.deserialize(topic, headers, bytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.KEY_HEADER;
import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.VALUE_HEADER;
import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName;
import static com.bakdata.kafka.LargeMessagePayload.ofBytes;
import static com.bakdata.kafka.LargeMessagePayload.ofUri;
Expand All @@ -37,6 +39,7 @@
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -173,7 +176,10 @@ void shouldReadNonBackedTextValueWithHeaders() {
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.value()).isEqualTo("foo");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(1)
.extracting(Header::key)
.containsExactlyInAnyOrder(VALUE_HEADER);
});
}

Expand Down Expand Up @@ -227,7 +233,10 @@ void shouldReadNonBackedTextKeyWithHeaders() {
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("foo");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(1)
.extracting(Header::key)
.containsExactlyInAnyOrder(KEY_HEADER);
});
}

Expand Down Expand Up @@ -278,8 +287,6 @@ void shouldReadBackedTextValueWithHeaders() {
this.createTopology(LargeMessageDeserializerTest::createValueTopology);
final Headers headers = new RecordHeaders();
final byte[] value = createBackedText(bucket, key, headers, false);
// add compression header for 'none' type, so we can assert it is also properly removed
headers.add(CompressionType.HEADER_NAME, new byte[]{CompressionType.NONE.getId()});
this.topology.input()
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.ByteArray())
Expand All @@ -292,7 +299,10 @@ void shouldReadBackedTextValueWithHeaders() {
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.value()).isEqualTo("foo");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(1)
.extracting(Header::key)
.containsExactlyInAnyOrder(VALUE_HEADER);
});
}

Expand Down Expand Up @@ -337,7 +347,10 @@ void shouldReadBackedTextKeyWithHeaders() {
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("foo");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(1)
.extracting(Header::key)
.containsExactlyInAnyOrder(KEY_HEADER);
});
}

Expand Down Expand Up @@ -380,7 +393,10 @@ void shouldReadNonBackedTextKeyAndBackedValueWithHeaders() {
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("foo");
assertThat(producerRecord.value()).isEqualTo("bar");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(2)
.extracting(Header::key)
.containsExactlyInAnyOrder(KEY_HEADER, VALUE_HEADER);
});
}

Expand All @@ -405,7 +421,10 @@ void shouldReadBackedTextKeyAndNonBackedValueWithHeaders() {
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("foo");
assertThat(producerRecord.value()).isEqualTo("bar");
assertThat(producerRecord.headers()).isEmpty();
assertThat(producerRecord.headers())
.hasSize(2)
.extracting(Header::key)
.containsExactlyInAnyOrder(KEY_HEADER, VALUE_HEADER);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.fluent_kafka_streams_tests.TestInput;
import com.bakdata.fluent_kafka_streams_tests.TestOutput;
import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,28 +35,28 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class LargeMessageSerdeTest extends AmazonS3IntegrationTest {

private static final String INPUT_TOPIC_1 = "input1";
private static final String INPUT_TOPIC_2 = "input2";
private static final String INPUT_TOPIC = "input";
private static final String JOIN_INPUT_TOPIC_1 = "input1";
private static final String JOIN_INPUT_TOPIC_2 = "input2";
private static final String OUTPUT_TOPIC = "output";
private TestTopology<Integer, String> topology;

private static Topology createTopology() {
private static Topology createJoinTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> input1 =
builder.stream(INPUT_TOPIC_1, Consumed.with(Serdes.String(), Serdes.String()))
builder.stream(JOIN_INPUT_TOPIC_1, Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((k, v) -> k.substring(0, 1))
.toTable();
final KTable<String, String> input2 =
builder.stream(INPUT_TOPIC_2, Consumed.with(Serdes.String(), Serdes.String()))
builder.stream(JOIN_INPUT_TOPIC_2, Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((k, v) -> k.substring(0, 1))
.toTable();
final KTable<String, String> joined = input1.join(input2, (l, r) -> l + r);
Expand All @@ -67,33 +65,67 @@ private static Topology createTopology() {
return builder.build();
}

@BeforeEach
void setup() {
this.topology = new TestTopology<>(LargeMessageSerdeTest::createTopology, this.createLargeMessageProperties());
this.topology.start();
private static Topology createDeadLetterTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(INPUT_TOPIC);
final KStream<String, String> processed = input.mapValues(v -> {
throw new RuntimeException("processing error");
});
processed.to(OUTPUT_TOPIC);
return builder.build();
}

@AfterEach
void tearDown() {
if (this.topology != null) {
this.topology.stop();
@Test
void shouldJoin() {
// this test creates a topology with a changelog store. The changelog store uses the Serde without headers
try (final TestTopology<String, String> topology = new TestTopology<>(LargeMessageSerdeTest::createJoinTopology,
this.createLargeMessageProperties())) {
topology.start();
topology.input(JOIN_INPUT_TOPIC_1)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.add("a", "foo");
topology.input(JOIN_INPUT_TOPIC_2)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.add("a", "bar");
final List<ProducerRecord<String, String>> records = topology.streamOutput()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.toList();
assertThat(records)
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("a");
assertThat(producerRecord.value()).isEqualTo("foobar");
});
}
}

@Test
void shouldJoin() {
// this test creates a topology with a changelog store. The changelog store uses the Serde without headers
this.getInput(INPUT_TOPIC_1)
.add("a", "foo");
this.getInput(INPUT_TOPIC_2)
.add("a", "bar");
final List<ProducerRecord<String, String>> records = this.getOutput().toList();
assertThat(records)
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("a");
assertThat(producerRecord.value()).isEqualTo("foobar");
});
void shouldSupportDeadLetters() {
final Map<String, Object> properties = this.createLargeMessageProperties();
final String errorTopic = "error";
properties.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, errorTopic);
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueProcessingExceptionHandler.class);
try (final TestTopology<String, String> topology = new TestTopology<>(
LargeMessageSerdeTest::createDeadLetterTopology, properties)) {
topology.start();
topology.input(INPUT_TOPIC)
.add("a", "foo");
final List<ProducerRecord<String, String>> output = topology.streamOutput(OUTPUT_TOPIC)
.toList();
assertThat(output).isEmpty();
final List<ProducerRecord<String, String>> error = topology.streamOutput(errorTopic)
.toList();
assertThat(error)
.hasSize(1)
.anySatisfy(producerRecord -> {
assertThat(producerRecord.key()).isEqualTo("a");
assertThat(producerRecord.value()).isEqualTo("foo");
});
}
}

private Map<String, Object> createLargeMessageProperties() {
Expand All @@ -110,16 +142,4 @@ private Map<String, Object> createLargeMessageProperties() {
return properties;
}

private TestOutput<String, String> getOutput() {
return this.topology.streamOutput()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String());
}

private TestInput<String, String> getInput(final String topic) {
return this.topology.input(topic)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String());
}

}
Loading