Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7fc0bef
feat(build): add Kafka dependencies for application and testing
Do-oya Oct 17, 2025
23d5c2c
feat(docker): enhance docker-compose with Kafka and Zookeeper setup
Do-oya Oct 17, 2025
9b4fb24
feat(post): add serialization and event publishing interface
Do-oya Oct 17, 2025
eeab1b6
feat(kafka): implement PostEventProducer for Kafka event publishing
Do-oya Oct 17, 2025
d5edabd
feat(kafka): add producer configuration for Kafka integration
Do-oya Oct 17, 2025
f750cc2
feat(kafka): add consumer and configuration for event processing
Do-oya Oct 17, 2025
f18a242
feat(kafka): implement post event processing and message handling
Do-oya Oct 17, 2025
bda2fce
feat(kafka): enhance producer and consumer configurations for flexibi…
Do-oya Oct 17, 2025
ce6a906
feat(kafka): enhance PostEventProducer for transactional messaging
Do-oya Oct 17, 2025
caf4d33
feat(post): replace PostSearchIndexer with PostEventPublisher
Do-oya Oct 17, 2025
90b87f8
feat(kafka): add DirectPostEventPublisher for handling events locally
Do-oya Oct 17, 2025
cd3c782
feat(config): simplify test configuration and add Kafka toggle
Do-oya Oct 17, 2025
6d413b0
feat(test): add integration tests for post event pipelines
Do-oya Oct 17, 2025
bbb4623
feat(test): remove PostEventHandlerTest for outdated indexing logic
Do-oya Oct 17, 2025
83116ce
feat(ci): add Kafka and Zookeeper services to CI pipeline
Do-oya Oct 17, 2025
60a3b56
feat(ci): improve CI pipeline step readability with emojis
Do-oya Oct 17, 2025
c804065
feat(ci): add compile check workflow for PR validation
Do-oya Oct 17, 2025
c4eac26
feat(ci): remove Zookeeper and migrate Kafka to KRaft mode
Do-oya Oct 17, 2025
57daacf
feat(ci): add Kafka cluster ID to CI environment configuration
Do-oya Oct 17, 2025
56a8378
feat(ci): refine Kafka startup and healthcheck configurations
Do-oya Oct 17, 2025
cd7f906
feat(ci): update Kafka start script with entrypoint and echo log
Do-oya Oct 17, 2025
9519b88
feat(ci): update Kafka entrypoint for cleaner command execution
Do-oya Oct 17, 2025
918309d
feat(ci): simplify Kafka entrypoint command in workflow
Do-oya Oct 17, 2025
0299228
feat(ci): remove redundant Kafka options and add cluster ID
Do-oya Oct 17, 2025
c49eff3
feat(ci): add CLUSTER_ID environment variable for Kafka setup
Do-oya Oct 17, 2025
b261fea
feat(ci): update Kafka cluster ID in environment variables
Do-oya Oct 17, 2025
8100e0c
feat(kafka): add error handler to post event consumer
Do-oya Oct 18, 2025
b757262
feat(gradle): add spring-retry dependency for retry logic
Do-oya Oct 18, 2025
59d2de9
feat(kafka): add retry template for Kafka producer
Do-oya Oct 18, 2025
565d730
feat(kafka): improve post event producer with retry mechanism
Do-oya Oct 18, 2025
2e0bcd9
feat(domain): add serializable implementation to post entities
Do-oya Oct 18, 2025
a8c7dfc
feat(kafka): add unique consumer group ID for test configuration
Do-oya Oct 18, 2025
ce42856
feat(docs): enhance Kafka and event pipeline documentation
Do-oya Oct 18, 2025
6458bb4
fix(kafka): handle unexpected exceptions in post event producer
Do-oya Oct 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 43 additions & 52 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,38 @@ jobs:
--health-timeout 5s
--health-retries 10

zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
ports:
- 2181:2181
env:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
options: >-
--health-cmd "echo ruok | nc localhost 2181 | grep imok || exit 1"
--health-interval 10s
--health-timeout 5s
--health-retries 10

kafka:
image: confluentinc/cp-kafka:7.6.1
ports:
- 9092:9092
env:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
options: >-
--health-cmd "nc -z localhost 9092"
--health-interval 10s
--health-timeout 5s
--health-retries 10
depends_on:
- zookeeper

steps:
- name: Check out repository
uses: actions/checkout@v4
Expand All @@ -57,39 +89,29 @@ jobs:

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v2
with:
min-wrapper-count: 1
allow-snapshots: false

- name: Make gradlew executable
run: chmod +x ./gradlew

- name: Verify Gradle Wrapper Checksum
run: |
if [ -f "gradle/wrapper/gradle-wrapper.properties" ]; then
echo "Gradle wrapper configuration verified"
cat gradle/wrapper/gradle-wrapper.properties
fi

- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
gradle-build-scan-report: true
cache-read-only: false

- name: Wait for Elasticsearch to be ready
- name: Wait for services to be ready
run: |
echo "Waiting for Elasticsearch..."
echo "Waiting for Elasticsearch, Kafka, and Zookeeper..."
for i in {1..20}; do
if curl -fsS http://localhost:9200 >/dev/null; then
echo "Elasticsearch is up!"
if curl -fsS http://localhost:9200 >/dev/null && nc -z localhost 9092 && nc -z localhost 2181; then
echo "All services are up!"
exit 0
fi
echo "Retrying in 3s..."
sleep 3
done
echo "Elasticsearch did not become ready in time."
exit 1
echo "Retrying in 5s..."
sleep 5
done
echo "Services did not become ready in time."
exit 1

- name: Run tests
run: |
Expand All @@ -102,6 +124,7 @@ jobs:
-Dspring.profiles.active=test
env:
SPRING_ELASTICSEARCH_URIS: http://localhost:9200
SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092

- name: Upload test results
uses: actions/upload-artifact@v4
Expand All @@ -111,36 +134,4 @@ jobs:
path: |
build/reports/tests/
build/test-results/
retention-days: 7

build-check:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'

steps:
- name: Check out repository
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
cache: gradle

- name: Create application.yml file
run: |
mkdir -p ./src/main/resources
echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Compile check
run: |
./gradlew compileJava compileTestJava \
--parallel \
--build-cache \
--configuration-cache
retention-days: 7
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.security:spring-security-core")
implementation("org.springframework.kafka:spring-kafka")
implementation("io.jsonwebtoken:jjwt-api:0.12.6")
implementation("io.jsonwebtoken:jjwt-impl:0.12.6")
implementation("io.jsonwebtoken:jjwt-jackson:0.12.6")
Expand All @@ -62,6 +63,7 @@ dependencies {

testCompileOnly("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation("org.junit-pioneer:junit-pioneer:2.3.0")
testImplementation("org.mockito:mockito-core:5.18.0")
Expand Down
79 changes: 54 additions & 25 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
version: "3.8"

services:
mysql:
image: mysql:8.0
container_name: see-mysql
restart: unless-stopped
ports:
- "3306:3306"
environment:
Expand All @@ -14,30 +17,15 @@ services:
- ./docker/mysql/init:/docker-entrypoint-initdb.d
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
timeout: 20s
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-proot"]
interval: 10s
timeout: 5s
retries: 10
restart: unless-stopped

# mysql-test:
# image: mysql:8.0
# container_name: see-test-mysql
# ports:
# - "3307:3306"
# environment:
# MYSQL_ROOT_PASSWORD: root
# MYSQL_DATABASE: see_test
# MYSQL_USER: see_user
# MYSQL_PASSWORD: see_password
# volumes:
# - mysql_test_data:/var/lib/mysql
# - ./docker/mysql/init:/docker-entrypoint-initdb.d
# command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
# restart: unless-stopped

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
container_name: see-elasticsearch
restart: unless-stopped
environment:
- discovery.type=single-node
- xpack.security.enabled=false
Expand All @@ -51,23 +39,64 @@ services:
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped

kibana:
image: docker.elastic.co/kibana/kibana:8.12.0
container_name: see-kibana
restart: unless-stopped
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://see-elasticsearch:9200
depends_on:
- elasticsearch
elasticsearch:
condition: service_healthy

zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
container_name: see-zookeeper
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
healthcheck:
test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok || exit 1"]
interval: 10s
timeout: 5s
retries: 10

kafka:
image: confluentinc/cp-kafka:7.6.1
container_name: see-kafka
restart: unless-stopped
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

# 두 가지 리스너를 노출하여 컨테이너 내부/호스트 접근 모두 지원
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://see-kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

# 기본 동작 설정
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_DELETE_TOPIC_ENABLE: "true"

healthcheck:
test: [ "CMD", "bash", "-c", "kafka-topics --bootstrap-server localhost:9092 --list >/dev/null 2>&1" ]
interval: 10s
timeout: 5s
retries: 10

volumes:
mysql_data:
driver: local
mysql_test_data:
es_data:
driver: local

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dooya.see.adapter.integration.kafka;

import dooya.see.application.post.required.PostEventPublisher;
import dooya.see.domain.shared.DomainEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "false", matchIfMissing = true)
public class DirectPostEventPublisher implements PostEventPublisher {
private final PostEventProcessor postEventProcessor;

@Override
public void publish(DomainEvent event) {
log.debug("Kafka 비활성화 상태 - 도메인 이벤트를 직접 처리합니다: {}", event);
postEventProcessor.process(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dooya.see.adapter.integration.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true")
public class PostEventConsumer {
private final PostEventProcessor postEventProcessor;

@KafkaListener(
topics = "${see.kafka.topics.post-events:post-events}",
groupId = "${spring.kafka.consumer.group-id:post-indexer-group}"
)
public void consume(PostEventMessage message) {
log.debug("Kafka 이벤트 수신: {}", message);
postEventProcessor.process(message);
}
Comment on lines 16 to 24
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

소비자 오류 처리와 역직렬화 설정을 명시하세요

현재 예외 시 처리 전략(DLT/재시도/백오프)이 보이지 않습니다. 또한 POJO 페이로드를 직접 받으려면 JsonDeserializer의 기본 타입/Trusted Packages 설정이 필요합니다. 전용 컨테이너 팩토리와 CommonErrorHandler를 사용하도록 리스너에 명시해 주세요.

예시(리스너에 컨테이너 팩토리 명시):

-    @KafkaListener(
+    @KafkaListener(
             topics = "${see.kafka.topics.post-events:post-events}",
-            groupId = "${spring.kafka.consumer.group-id:post-indexer-group}"
-    )
+            groupId = "${spring.kafka.consumer.group-id:post-indexer-group}",
+            containerFactory = "postEventKafkaListenerContainerFactory"
+    )
     public void consume(PostEventMessage message) {

다음 스캔으로 설정 유무를 확인해 주세요:

Based on learnings


🏁 Script executed:

#!/usr/bin/env bash
# JsonDeserializer/Trusted Packages/에러핸들러/컨테이너 팩토리 존재 여부 점검
rg -nP -C2 '(JsonDeserializer|CommonErrorHandler|DefaultErrorHandler|ConcurrentKafkaListenerContainerFactory|postEventKafkaListenerContainerFactory|spring\.kafka\.consumer\.properties\.spring\.json\.trusted\.packages|JsonDeserializer\.VALUE_DEFAULT_TYPE)'

Length of output: 3779



---

🏁 Script executed:

```shell
#!/bin/bash
# Get full KafkaConsumerConfig.java to see error handler setup
cat src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java

Length of output: 2622


🏁 Script executed:

#!/bin/bash
# Check if @KafkaListener in PostEventConsumer specifies containerFactory
rg -nP -A5 '@KafkaListener' src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java

Length of output: 288



---

🏁 Script executed:

```shell
#!/bin/bash
# Verify if error handler is configured anywhere for Kafka
rg -nP '(setCommonErrorHandler|setErrorHandler|DefaultErrorHandler|CommonErrorHandler)' src/main/java/dooya/see/adapter/integration/kafka/

Length of output: 0


@KafkaListener에 containerFactory를 지정하고 에러 핸들러를 설정하세요

현재 상태 검증 결과:

  • ✓ JsonDeserializer 및 신뢰 패키지(trusted packages)는 이미 올바르게 구성됨 (KafkaConsumerConfig.java 라인 37-40)
  • ✗ @KafkaListener가 containerFactory 파라미터를 지정하지 않음 (PostEventConsumer.java 라인 16-19)
  • ✗ CommonErrorHandler 또는 DefaultErrorHandler가 설정되지 않음

필수 수정 사항:

  1. PostEventConsumer.java 라인 16: containerFactory = "kafkaListenerContainerFactory" 파라미터 추가
  2. KafkaConsumerConfig.java 라인 52: kafkaListenerContainerFactory() 메서드에 에러 핸들러 추가 (예: factory.setCommonErrorHandler(new DefaultErrorHandler()) 또는 사용자 정의 에러 핸들러 구현)
🤖 Prompt for AI Agents
In src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java
around lines 16 to 23, add the containerFactory parameter to the @KafkaListener
annotation (containerFactory = "kafkaListenerContainerFactory") so the listener
uses the configured factory; and in
src/main/java/dooya/see/config/KafkaConsumerConfig.java around line 52, update
the kafkaListenerContainerFactory() method to attach an error handler by calling
factory.setCommonErrorHandler(...) with a DefaultErrorHandler or your custom
CommonErrorHandler instance (ensure any required imports and bean visibility are
in place).

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dooya.see.adapter.integration.kafka;

import java.io.Serializable;

public record PostEventMessage(PostEventType type, Long postId) implements Serializable {
static PostEventMessage created(Long postId) {
return new PostEventMessage(PostEventType.CREATED, postId);
}

static PostEventMessage updated(Long postId) {
return new PostEventMessage(PostEventType.UPDATED, postId);
}

static PostEventMessage deleted(Long postId) {
return new PostEventMessage(PostEventType.DELETED, postId);
}
}
Loading