Skip to content

Commit 2dd5375

Browse files
authored
Merge pull request #159 from see-projects/feat#158-kafka
도메인 이벤트 기반 색인 파이프라인 Kafka 도입
2 parents a91fcfd + 6458bb4 commit 2dd5375

27 files changed

+941
-173
lines changed

.docs/도메인 모델 문서.md

Lines changed: 102 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -244,13 +244,14 @@ stateDiagram-v2
244244

245245
#### PostContent 값 객체
246246
- **제목 필수**: 빈 제목 허용 안함
247-
- **내용 선택**: 본문은 빈 내용 허용
247+
- **내용 필수**: 본문은 빈 내용 허용 안함, 최대 50,000자 제한
248248
- **불변성**: 수정 시 새로운 인스턴스 생성
249249

250250
#### PostMetaData 값 객체
251251
- **시간 추적**: 생성/수정/발행 시점 기록
252252
- **자동 갱신**: 수정 시 modifiedAt 자동 업데이트
253253
- **불변성**: 시점 정보는 생성 후 변경 불가
254+
- **직렬화 지원**: Kafka 전송을 위해 관련 엔티티/값 객체(Post, PostContent, PostMetaData, Tag)는 모두 `Serializable`을 구현
254255

255256
## 댓글(Comment) 애그리거트
256257

@@ -368,37 +369,30 @@ classDiagram
368369
### 포스트 관련 이벤트
369370

370371
```mermaid
371-
graph TB
372-
subgraph "Post Events"
373-
PC[PostCreated]
374-
PU[PostUpdated]
375-
PPub[PostPublished]
376-
PH[PostHidden]
377-
PD[PostDeleted]
378-
PV[PostViewed]
379-
PL[PostLiked]
380-
PUL[PostUnliked]
381-
end
382-
383-
subgraph "Comment Events"
384-
CC[CommentCreated]
385-
CU[CommentUpdated]
386-
CH[CommentHidden]
387-
CD[CommentDeleted]
388-
end
389-
390-
subgraph "Event Handlers"
391-
PSE[PostStatsEventHandler]
392-
end
393-
394-
PC --> PSE
395-
PV --> PSE
396-
PL --> PSE
397-
PUL --> PSE
398-
CC --> PSE
399-
CD --> PSE
372+
graph LR
373+
PostAggregate((Post Aggregate))
374+
Handler[PostEventHandler]
375+
Publisher[PostEventPublisher]
376+
Producer[PostEventProducer]
377+
Kafka[(Kafka Broker)]
378+
Consumer[PostEventConsumer]
379+
Processor[PostEventProcessor]
380+
Indexer[PostSearchIndexer → Elasticsearch]
381+
Direct[DirectPostEventPublisher]
382+
Stats[PostStatsEventHandler]
383+
384+
PostAggregate --> Handler
385+
Handler --> Publisher
386+
Publisher --> Producer
387+
Producer --> |PostEventMessage| Kafka
388+
Kafka --> Consumer --> Processor --> Indexer
389+
Publisher --> Direct --> Processor
390+
Processor --> Indexer
391+
PostAggregate --> Stats
400392
```
401393

394+
도메인 이벤트는 Post 애그리거트에서 발행되며 `PostEventHandler`가 애플리케이션 이벤트로 수신합니다. 기본적으로 Kafka를 거쳐 `PostEventProcessor`가 Elasticsearch 색인을 갱신하고, 통계 관련 이벤트는 `PostStatsEventHandler`가 별도로 처리합니다. Kafka를 사용할 수 없는 경우 `DirectPostEventPublisher`가 동일한 프로세서를 직접 호출합니다.
395+
402396
### 이벤트 처리 규칙
403397

404398
#### 통계 업데이트
@@ -564,27 +558,88 @@ public class Post extends AbstractAggregateRoot {
564558
#### 이벤트 처리 패턴
565559

566560
```java
567-
@Async
568-
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
569-
public void handlePostViewed(PostViewed event) {
570-
if (event.postId() == null) return;
571-
572-
log.info("PostViewed 이벤트 처리: postId={}, viewerId={}", event.postId(), event.memberId());
573-
574-
try {
575-
postStatsManager.incrementViewCount(event.postId());
576-
} catch (Exception e) {
577-
log.error("조회수 증가 실패: postId={}", event.postId(), e);
578-
// 통계 실패가 메인 로직에 영향주지 않도록 격리
561+
@Component
562+
@RequiredArgsConstructor
563+
public class PostEventHandler {
564+
565+
private final PostEventPublisher postEventPublisher;
566+
567+
@EventListener
568+
public void handlePostCreated(PostCreated event) {
569+
postEventPublisher.publish(event);
570+
}
571+
572+
@EventListener
573+
public void handlePostUpdated(PostUpdated event) {
574+
postEventPublisher.publish(event);
575+
}
576+
577+
@EventListener
578+
public void handlePostDeleted(PostDeleted event) {
579+
postEventPublisher.publish(event);
580+
}
581+
}
582+
583+
@Component
584+
@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true")
585+
@RequiredArgsConstructor
586+
public class PostEventProducer implements PostEventPublisher {
587+
private final KafkaTemplate<String, PostEventMessage> kafkaTemplate;
588+
private final RetryTemplate retryTemplate;
589+
590+
@Value("${see.kafka.topics.post-events:post-events}")
591+
private String topic;
592+
593+
@Override
594+
public void publish(DomainEvent event) {
595+
PostEventMessage message = PostEventMessage.from(event);
596+
String key = message.postId().toString();
597+
598+
Runnable sender = () -> retryTemplate.execute(ctx -> {
599+
kafkaTemplate.send(topic, key, message).get();
600+
return null;
601+
}, recovery -> {
602+
throw new IllegalStateException("Kafka 전송 실패", recovery.getLastThrowable());
603+
});
604+
605+
if (TransactionSynchronizationManager.isSynchronizationActive()) {
606+
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
607+
@Override
608+
public void afterCommit() {
609+
sender.run();
610+
}
611+
});
612+
} else {
613+
sender.run();
614+
}
615+
}
616+
}
617+
618+
@Component
619+
@RequiredArgsConstructor
620+
class PostEventProcessor {
621+
private final PostRepository postRepository;
622+
private final PostSearchIndexer indexer;
623+
624+
void process(PostEventMessage message) {
625+
switch (message.type()) {
626+
case CREATED, UPDATED -> postRepository.findById(message.postId())
627+
.ifPresent(indexer::index);
628+
case DELETED -> indexer.delete(message.postId());
629+
}
579630
}
580631
}
581632
```
582633

634+
Kafka를 사용할 수 없는 환경에서는 `see.kafka.enabled=false`로 설정하여 `DirectPostEventPublisher`가 즉시 `PostEventProcessor`를 호출하도록 구성했습니다.
635+
583636
#### 이벤트 처리 원칙
584-
- **트랜잭션 분리**: `@TransactionalEventListener(AFTER_COMMIT)`로 메인 트랜잭션과 분리
585-
- **비동기 처리**: `@Async`로 성능 최적화
586-
- **예외 격리**: try-catch로 사이드 이펙트 실패가 메인 로직에 영향 없도록 보장
587-
- **PostStatsManager 활용**: 인터페이스를 통한 관심사 분리
637+
- **트랜잭션 후처리**: Kafka 전송은 커밋 이후 `TransactionSynchronization`을 통해 수행
638+
- **메시지 키**: `postId`를 메시지 키로 사용해 파티션 내 순서를 보장
639+
- **재시도 정책**: `RetryTemplate`으로 최대 3회 재시도 후 실패를 호출자에게 전달
640+
- **컨슈머 에러 처리**: `DefaultErrorHandler`가 적용된 리스너 컨테이너로 재처리/로그를 관리
641+
- **Fallback 보장**: Kafka 비활성화 시에도 동일한 이벤트 플로우를 직접 실행
642+
- **테스트 격리**: Embedded Kafka 통합 테스트에서는 `${random.uuid}` 그룹 ID로 소비자를 분리
588643

589644
### 5. 헥사고날 아키텍처 적용
590645

@@ -680,4 +735,4 @@ public class JpaMemberRepository implements MemberRepository {
680735
- **Spring Data JPA 레퍼런스**
681736
- **헥사고날 아키텍처 가이드**
682737

683-
이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다.
738+
이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다.

.docs/용어 사전 문서.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,20 @@ graph LR
343343
| **롤백** | Rollback | 트랜잭션을 취소하고 이전 상태로 복원 | "에러 발생 시 자동으로 롤백됩니다" |
344344
| **커밋** | Commit | 트랜잭션의 변경사항을 영구 반영 | "모든 작업이 성공하면 커밋합니다" |
345345

346+
### 메시징 & 이벤트 용어
347+
348+
| 한국어 | 영어 | 정의 | 사용 예시 |
349+
|--------|------|------|-----------|
350+
| **도메인 이벤트** | Domain Event | 애그리거트가 발행하는 상태 변화 알림 | "PostCreated 이벤트가 Kafka로 전송됩니다" |
351+
| **PostEventPublisher** | PostEventPublisher | 도메인 이벤트를 외부 파이프라인으로 전달하는 애플리케이션 포트 | "PostEventHandler는 PostEventPublisher를 통해 메시지를 보냅니다" |
352+
| **PostEventProducer** | PostEventProducer | KafkaTemplate을 사용해 이벤트 메시지를 브로커로 보내는 어댑터 | "재시도 정책이 적용된 PostEventProducer" |
353+
| **PostEventMessage** | PostEventMessage | Kafka에 전송되는 직렬화 가능한 포스트 이벤트 DTO | "postId와 이벤트 타입만 포함한 경량 메시지" |
354+
| **PostEventProcessor** | PostEventProcessor | Kafka 메시지를 읽어 최신 포스트 스냅샷을 색인 서비스로 위임하는 컴포넌트 | "메시지 타입에 따라 index/delete를 수행" |
355+
| **DirectPostEventPublisher** | DirectPostEventPublisher | Kafka 비활성화 시 즉시 색인을 수행하는 대체 어댑터 | "로컬 개발에서는 DirectPostEventPublisher가 사용됩니다" |
356+
| **Embedded Kafka** | Embedded Kafka | 테스트에서 경량 Kafka 브로커를 실행하는 도구 | "PostEventKafkaPipelineTest는 Embedded Kafka로 파이프라인을 검증" |
357+
| **RetryTemplate** | RetryTemplate | 일정한 정책으로 연산을 재시도하게 해주는 Spring Retry 유틸리티 | "Kafka 전송 실패 시 RetryTemplate이 최대 3회 재시도합니다" |
358+
| **see.kafka.enabled** | see.kafka.enabled | Kafka 사용 여부를 제어하는 애플리케이션 설정 | "테스트에서는 see.kafka.enabled=false로 즉시 처리" |
359+
346360
## 💡 디자인 패턴 용어
347361

348362
### 생성 패턴
@@ -402,4 +416,4 @@ graph LR
402416

403417
---
404418

405-
이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요.
419+
이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요.

.github/workflows/ci.yml

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -32,66 +32,84 @@ jobs:
3232
--health-timeout 5s
3333
--health-retries 10
3434
35+
kafka:
36+
image: confluentinc/cp-kafka:7.6.1
37+
ports:
38+
- 9092:9092
39+
env:
40+
CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA
41+
KAFKA_CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA
42+
KAFKA_PROCESS_ROLES: broker,controller
43+
KAFKA_NODE_ID: 1
44+
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
45+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
46+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
47+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
48+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
49+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
50+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
51+
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
52+
options: >-
53+
--health-cmd "nc -z localhost 9092"
54+
--health-interval 15s
55+
--health-timeout 10s
56+
--health-retries 10
57+
3558
steps:
36-
- name: Check out repository
59+
- name: 🧩 Checkout repository
3760
uses: actions/checkout@v4
3861
with:
3962
fetch-depth: 1
4063

41-
- name: Set up JDK 21
64+
- name: ☕️ Set up JDK 21
4265
uses: actions/setup-java@v4
4366
with:
4467
java-version: '21'
4568
distribution: 'temurin'
4669
cache: gradle
4770

48-
- name: Create application.yml file
71+
- name: 📝 Create application.yml
4972
run: |
5073
mkdir -p ./src/main/resources
5174
echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml
5275
53-
- name: Disable IPv6
76+
- name: 🧱 Disable IPv6 (network stability)
5477
run: |
5578
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
5679
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
5780
58-
- name: Validate Gradle wrapper
81+
- name: 🧩 Validate Gradle Wrapper
5982
uses: gradle/wrapper-validation-action@v2
60-
with:
61-
min-wrapper-count: 1
62-
allow-snapshots: false
6383

64-
- name: Make gradlew executable
84+
- name: 🧍 Make gradlew executable
6585
run: chmod +x ./gradlew
6686

67-
- name: Verify Gradle Wrapper Checksum
68-
run: |
69-
if [ -f "gradle/wrapper/gradle-wrapper.properties" ]; then
70-
echo "Gradle wrapper configuration verified"
71-
cat gradle/wrapper/gradle-wrapper.properties
72-
fi
73-
74-
- name: Setup Gradle
87+
- name: ⚙️ Setup Gradle
7588
uses: gradle/gradle-build-action@v2
7689
with:
7790
gradle-build-scan-report: true
7891
cache-read-only: false
7992

80-
- name: Wait for Elasticsearch to be ready
93+
- name: 🕐 Wait for Kafka and Elasticsearch
8194
run: |
82-
echo "Waiting for Elasticsearch..."
83-
for i in {1..20}; do
84-
if curl -fsS http://localhost:9200 >/dev/null; then
85-
echo "Elasticsearch is up!"
95+
echo "Waiting for Elasticsearch and Kafka to be ready..."
96+
for i in {1..25}; do
97+
es_ready=$(curl -fsS http://localhost:9200/_cluster/health > /dev/null && echo "yes" || echo "no")
98+
kafka_ready=$(nc -z localhost 9092 && echo "yes" || echo "no")
99+
100+
if [ "$es_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then
101+
echo "✅ All services are ready!"
86102
exit 0
87103
fi
88-
echo "Retrying in 3s..."
89-
sleep 3
90-
done
91-
echo "Elasticsearch did not become ready in time."
92-
exit 1
93104
94-
- name: Run tests
105+
echo "⏳ Waiting for services... retrying in 5s ($i/25)"
106+
sleep 5
107+
done
108+
109+
echo "❌ Services did not start in time"
110+
exit 1
111+
112+
- name: 🧪 Run tests
95113
run: |
96114
./gradlew clean test \
97115
--parallel \
@@ -102,8 +120,9 @@ jobs:
102120
-Dspring.profiles.active=test
103121
env:
104122
SPRING_ELASTICSEARCH_URIS: http://localhost:9200
123+
SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092
105124

106-
- name: Upload test results
125+
- name: 📦 Upload test results
107126
uses: actions/upload-artifact@v4
108127
if: always()
109128
with:
@@ -118,29 +137,39 @@ jobs:
118137
if: github.event_name == 'pull_request'
119138

120139
steps:
121-
- name: Check out repository
140+
- name: 🧩 Checkout repository
122141
uses: actions/checkout@v4
123142
with:
124143
fetch-depth: 1
125144

126-
- name: Set up JDK 21
145+
- name: ☕️ Set up JDK 21
127146
uses: actions/setup-java@v4
128147
with:
129148
java-version: '21'
130149
distribution: 'temurin'
131150
cache: gradle
132151

133-
- name: Create application.yml file
152+
- name: 📝 Create application.yml
134153
run: |
135154
mkdir -p ./src/main/resources
136155
echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml
137156
138-
- name: Setup Gradle
157+
- name: ⚙️ Setup Gradle
139158
uses: gradle/gradle-build-action@v2
140159

141-
- name: Compile check
160+
- name: 🧱 Compile check
142161
run: |
143162
./gradlew compileJava compileTestJava \
144163
--parallel \
145164
--build-cache \
146-
--configuration-cache
165+
--configuration-cache
166+
167+
- name: 📦 Upload build results
168+
uses: actions/upload-artifact@v4
169+
if: always()
170+
with:
171+
name: build-results
172+
path: |
173+
build/classes/
174+
build/libs/
175+
retention-days: 7

0 commit comments

Comments
 (0)