Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7fc0bef
feat(build): add Kafka dependencies for application and testing
Do-oya Oct 17, 2025
23d5c2c
feat(docker): enhance docker-compose with Kafka and Zookeeper setup
Do-oya Oct 17, 2025
9b4fb24
feat(post): add serialization and event publishing interface
Do-oya Oct 17, 2025
eeab1b6
feat(kafka): implement PostEventProducer for Kafka event publishing
Do-oya Oct 17, 2025
d5edabd
feat(kafka): add producer configuration for Kafka integration
Do-oya Oct 17, 2025
f750cc2
feat(kafka): add consumer and configuration for event processing
Do-oya Oct 17, 2025
f18a242
feat(kafka): implement post event processing and message handling
Do-oya Oct 17, 2025
bda2fce
feat(kafka): enhance producer and consumer configurations for flexibi…
Do-oya Oct 17, 2025
ce6a906
feat(kafka): enhance PostEventProducer for transactional messaging
Do-oya Oct 17, 2025
caf4d33
feat(post): replace PostSearchIndexer with PostEventPublisher
Do-oya Oct 17, 2025
90b87f8
feat(kafka): add DirectPostEventPublisher for handling events locally
Do-oya Oct 17, 2025
cd3c782
feat(config): simplify test configuration and add Kafka toggle
Do-oya Oct 17, 2025
6d413b0
feat(test): add integration tests for post event pipelines
Do-oya Oct 17, 2025
bbb4623
feat(test): remove PostEventHandlerTest for outdated indexing logic
Do-oya Oct 17, 2025
83116ce
feat(ci): add Kafka and Zookeeper services to CI pipeline
Do-oya Oct 17, 2025
60a3b56
feat(ci): improve CI pipeline step readability with emojis
Do-oya Oct 17, 2025
c804065
feat(ci): add compile check workflow for PR validation
Do-oya Oct 17, 2025
c4eac26
feat(ci): remove Zookeeper and migrate Kafka to KRaft mode
Do-oya Oct 17, 2025
57daacf
feat(ci): add Kafka cluster ID to CI environment configuration
Do-oya Oct 17, 2025
56a8378
feat(ci): refine Kafka startup and healthcheck configurations
Do-oya Oct 17, 2025
cd7f906
feat(ci): update Kafka start script with entrypoint and echo log
Do-oya Oct 17, 2025
9519b88
feat(ci): update Kafka entrypoint for cleaner command execution
Do-oya Oct 17, 2025
918309d
feat(ci): simplify Kafka entrypoint command in workflow
Do-oya Oct 17, 2025
0299228
feat(ci): remove redundant Kafka options and add cluster ID
Do-oya Oct 17, 2025
c49eff3
feat(ci): add CLUSTER_ID environment variable for Kafka setup
Do-oya Oct 17, 2025
b261fea
feat(ci): update Kafka cluster ID in environment variables
Do-oya Oct 17, 2025
8100e0c
feat(kafka): add error handler to post event consumer
Do-oya Oct 18, 2025
b757262
feat(gradle): add spring-retry dependency for retry logic
Do-oya Oct 18, 2025
59d2de9
feat(kafka): add retry template for Kafka producer
Do-oya Oct 18, 2025
565d730
feat(kafka): improve post event producer with retry mechanism
Do-oya Oct 18, 2025
2e0bcd9
feat(domain): add serializable implementation to post entities
Do-oya Oct 18, 2025
a8c7dfc
feat(kafka): add unique consumer group ID for test configuration
Do-oya Oct 18, 2025
ce42856
feat(docs): enhance Kafka and event pipeline documentation
Do-oya Oct 18, 2025
6458bb4
fix(kafka): handle unexpected exceptions in post event producer
Do-oya Oct 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 102 additions & 47 deletions .docs/도메인 모델 문서.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,14 @@ stateDiagram-v2

#### PostContent 값 객체
- **제목 필수**: 빈 제목 허용 안함
- **내용 선택**: 본문은 빈 내용 허용
- **내용 필수**: 본문은 빈 내용 허용 안함, 최대 50,000자 제한
- **불변성**: 수정 시 새로운 인스턴스 생성

#### PostMetaData 값 객체
- **시간 추적**: 생성/수정/발행 시점 기록
- **자동 갱신**: 수정 시 modifiedAt 자동 업데이트
- **불변성**: 시점 정보는 생성 후 변경 불가
- **직렬화 지원**: Kafka 전송을 위해 관련 엔티티/값 객체(Post, PostContent, PostMetaData, Tag)는 모두 `Serializable`을 구현

## 댓글(Comment) 애그리거트

Expand Down Expand Up @@ -368,37 +369,30 @@ classDiagram
### 포스트 관련 이벤트

```mermaid
graph TB
subgraph "Post Events"
PC[PostCreated]
PU[PostUpdated]
PPub[PostPublished]
PH[PostHidden]
PD[PostDeleted]
PV[PostViewed]
PL[PostLiked]
PUL[PostUnliked]
end

subgraph "Comment Events"
CC[CommentCreated]
CU[CommentUpdated]
CH[CommentHidden]
CD[CommentDeleted]
end

subgraph "Event Handlers"
PSE[PostStatsEventHandler]
end

PC --> PSE
PV --> PSE
PL --> PSE
PUL --> PSE
CC --> PSE
CD --> PSE
graph LR
PostAggregate((Post Aggregate))
Handler[PostEventHandler]
Publisher[PostEventPublisher]
Producer[PostEventProducer]
Kafka[(Kafka Broker)]
Consumer[PostEventConsumer]
Processor[PostEventProcessor]
Indexer[PostSearchIndexer → Elasticsearch]
Direct[DirectPostEventPublisher]
Stats[PostStatsEventHandler]

PostAggregate --> Handler
Handler --> Publisher
Publisher --> Producer
Producer --> |PostEventMessage| Kafka
Kafka --> Consumer --> Processor --> Indexer
Publisher --> Direct --> Processor
Processor --> Indexer
PostAggregate --> Stats
```

도메인 이벤트는 Post 애그리거트에서 발행되며 `PostEventHandler`가 애플리케이션 이벤트로 수신합니다. 기본적으로 Kafka를 거쳐 `PostEventProcessor`가 Elasticsearch 색인을 갱신하고, 통계 관련 이벤트는 `PostStatsEventHandler`가 별도로 처리합니다. Kafka를 사용할 수 없는 경우 `DirectPostEventPublisher`가 동일한 프로세서를 직접 호출합니다.

### 이벤트 처리 규칙

#### 통계 업데이트
Expand Down Expand Up @@ -564,27 +558,88 @@ public class Post extends AbstractAggregateRoot {
#### 이벤트 처리 패턴

```java
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handlePostViewed(PostViewed event) {
if (event.postId() == null) return;

log.info("PostViewed 이벤트 처리: postId={}, viewerId={}", event.postId(), event.memberId());

try {
postStatsManager.incrementViewCount(event.postId());
} catch (Exception e) {
log.error("조회수 증가 실패: postId={}", event.postId(), e);
// 통계 실패가 메인 로직에 영향주지 않도록 격리
@Component
@RequiredArgsConstructor
public class PostEventHandler {

private final PostEventPublisher postEventPublisher;

@EventListener
public void handlePostCreated(PostCreated event) {
postEventPublisher.publish(event);
}

@EventListener
public void handlePostUpdated(PostUpdated event) {
postEventPublisher.publish(event);
}

@EventListener
public void handlePostDeleted(PostDeleted event) {
postEventPublisher.publish(event);
}
}

@Component
@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true")
@RequiredArgsConstructor
public class PostEventProducer implements PostEventPublisher {
private final KafkaTemplate<String, PostEventMessage> kafkaTemplate;
private final RetryTemplate retryTemplate;

@Value("${see.kafka.topics.post-events:post-events}")
private String topic;

@Override
public void publish(DomainEvent event) {
PostEventMessage message = PostEventMessage.from(event);
String key = message.postId().toString();

Runnable sender = () -> retryTemplate.execute(ctx -> {
kafkaTemplate.send(topic, key, message).get();
return null;
}, recovery -> {
throw new IllegalStateException("Kafka 전송 실패", recovery.getLastThrowable());
});

if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
sender.run();
}
});
} else {
sender.run();
}
}
}

@Component
@RequiredArgsConstructor
class PostEventProcessor {
private final PostRepository postRepository;
private final PostSearchIndexer indexer;

void process(PostEventMessage message) {
switch (message.type()) {
case CREATED, UPDATED -> postRepository.findById(message.postId())
.ifPresent(indexer::index);
case DELETED -> indexer.delete(message.postId());
}
}
}
```

Kafka를 사용할 수 없는 환경에서는 `see.kafka.enabled=false`로 설정하여 `DirectPostEventPublisher`가 즉시 `PostEventProcessor`를 호출하도록 구성했습니다.

#### 이벤트 처리 원칙
- **트랜잭션 분리**: `@TransactionalEventListener(AFTER_COMMIT)`로 메인 트랜잭션과 분리
- **비동기 처리**: `@Async`로 성능 최적화
- **예외 격리**: try-catch로 사이드 이펙트 실패가 메인 로직에 영향 없도록 보장
- **PostStatsManager 활용**: 인터페이스를 통한 관심사 분리
- **트랜잭션 후처리**: Kafka 전송은 커밋 이후 `TransactionSynchronization`을 통해 수행
- **메시지 키**: `postId`를 메시지 키로 사용해 파티션 내 순서를 보장
- **재시도 정책**: `RetryTemplate`으로 최대 3회 재시도 후 실패를 호출자에게 전달
- **컨슈머 에러 처리**: `DefaultErrorHandler`가 적용된 리스너 컨테이너로 재처리/로그를 관리
- **Fallback 보장**: Kafka 비활성화 시에도 동일한 이벤트 플로우를 직접 실행
- **테스트 격리**: Embedded Kafka 통합 테스트에서는 `${random.uuid}` 그룹 ID로 소비자를 분리

### 5. 헥사고날 아키텍처 적용

Expand Down Expand Up @@ -680,4 +735,4 @@ public class JpaMemberRepository implements MemberRepository {
- **Spring Data JPA 레퍼런스**
- **헥사고날 아키텍처 가이드**

이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다.
이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다.
16 changes: 15 additions & 1 deletion .docs/용어 사전 문서.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,20 @@ graph LR
| **롤백** | Rollback | 트랜잭션을 취소하고 이전 상태로 복원 | "에러 발생 시 자동으로 롤백됩니다" |
| **커밋** | Commit | 트랜잭션의 변경사항을 영구 반영 | "모든 작업이 성공하면 커밋합니다" |

### 메시징 & 이벤트 용어

| 한국어 | 영어 | 정의 | 사용 예시 |
|--------|------|------|-----------|
| **도메인 이벤트** | Domain Event | 애그리거트가 발행하는 상태 변화 알림 | "PostCreated 이벤트가 Kafka로 전송됩니다" |
| **PostEventPublisher** | PostEventPublisher | 도메인 이벤트를 외부 파이프라인으로 전달하는 애플리케이션 포트 | "PostEventHandler는 PostEventPublisher를 통해 메시지를 보냅니다" |
| **PostEventProducer** | PostEventProducer | KafkaTemplate을 사용해 이벤트 메시지를 브로커로 보내는 어댑터 | "재시도 정책이 적용된 PostEventProducer" |
| **PostEventMessage** | PostEventMessage | Kafka에 전송되는 직렬화 가능한 포스트 이벤트 DTO | "postId와 이벤트 타입만 포함한 경량 메시지" |
| **PostEventProcessor** | PostEventProcessor | Kafka 메시지를 읽어 최신 포스트 스냅샷을 색인 서비스로 위임하는 컴포넌트 | "메시지 타입에 따라 index/delete를 수행" |
| **DirectPostEventPublisher** | DirectPostEventPublisher | Kafka 비활성화 시 즉시 색인을 수행하는 대체 어댑터 | "로컬 개발에서는 DirectPostEventPublisher가 사용됩니다" |
| **Embedded Kafka** | Embedded Kafka | 테스트에서 경량 Kafka 브로커를 실행하는 도구 | "PostEventKafkaPipelineTest는 Embedded Kafka로 파이프라인을 검증" |
| **RetryTemplate** | RetryTemplate | 일정한 정책으로 연산을 재시도하게 해주는 Spring Retry 유틸리티 | "Kafka 전송 실패 시 RetryTemplate이 최대 3회 재시도합니다" |
| **see.kafka.enabled** | see.kafka.enabled | Kafka 사용 여부를 제어하는 애플리케이션 설정 | "테스트에서는 see.kafka.enabled=false로 즉시 처리" |

## 💡 디자인 패턴 용어

### 생성 패턴
Expand Down Expand Up @@ -402,4 +416,4 @@ graph LR

---

이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요.
이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요.
99 changes: 64 additions & 35 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,66 +32,84 @@ jobs:
--health-timeout 5s
--health-retries 10

kafka:
image: confluentinc/cp-kafka:7.6.1
ports:
- 9092:9092
env:
CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA
KAFKA_CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
options: >-
--health-cmd "nc -z localhost 9092"
--health-interval 15s
--health-timeout 10s
--health-retries 10

steps:
- name: Check out repository
- name: 🧩 Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Set up JDK 21
- name: ☕️ Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
cache: gradle

- name: Create application.yml file
- name: 📝 Create application.yml
run: |
mkdir -p ./src/main/resources
echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml

- name: Disable IPv6
- name: 🧱 Disable IPv6 (network stability)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1

- name: Validate Gradle wrapper
- name: 🧩 Validate Gradle Wrapper
uses: gradle/wrapper-validation-action@v2
with:
min-wrapper-count: 1
allow-snapshots: false

- name: Make gradlew executable
- name: 🧍 Make gradlew executable
run: chmod +x ./gradlew

- name: Verify Gradle Wrapper Checksum
run: |
if [ -f "gradle/wrapper/gradle-wrapper.properties" ]; then
echo "Gradle wrapper configuration verified"
cat gradle/wrapper/gradle-wrapper.properties
fi

- name: Setup Gradle
- name: ⚙️ Setup Gradle
uses: gradle/gradle-build-action@v2
with:
gradle-build-scan-report: true
cache-read-only: false

- name: Wait for Elasticsearch to be ready
- name: 🕐 Wait for Kafka and Elasticsearch
run: |
echo "Waiting for Elasticsearch..."
for i in {1..20}; do
if curl -fsS http://localhost:9200 >/dev/null; then
echo "Elasticsearch is up!"
echo "Waiting for Elasticsearch and Kafka to be ready..."
for i in {1..25}; do
es_ready=$(curl -fsS http://localhost:9200/_cluster/health > /dev/null && echo "yes" || echo "no")
kafka_ready=$(nc -z localhost 9092 && echo "yes" || echo "no")

if [ "$es_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then
echo "✅ All services are ready!"
exit 0
fi
echo "Retrying in 3s..."
sleep 3
done
echo "Elasticsearch did not become ready in time."
exit 1

- name: Run tests
echo "⏳ Waiting for services... retrying in 5s ($i/25)"
sleep 5
done

echo "❌ Services did not start in time"
exit 1

- name: 🧪 Run tests
run: |
./gradlew clean test \
--parallel \
Expand All @@ -102,8 +120,9 @@ jobs:
-Dspring.profiles.active=test
env:
SPRING_ELASTICSEARCH_URIS: http://localhost:9200
SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092

- name: Upload test results
- name: 📦 Upload test results
uses: actions/upload-artifact@v4
if: always()
with:
Expand All @@ -118,29 +137,39 @@ jobs:
if: github.event_name == 'pull_request'

steps:
- name: Check out repository
- name: 🧩 Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Set up JDK 21
- name: ☕️ Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
cache: gradle

- name: Create application.yml file
- name: 📝 Create application.yml
run: |
mkdir -p ./src/main/resources
echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml

- name: Setup Gradle
- name: ⚙️ Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Compile check
- name: 🧱 Compile check
run: |
./gradlew compileJava compileTestJava \
--parallel \
--build-cache \
--configuration-cache
--configuration-cache

- name: 📦 Upload build results
uses: actions/upload-artifact@v4
if: always()
with:
name: build-results
path: |
build/classes/
build/libs/
retention-days: 7
Loading
Loading