diff --git "a/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" "b/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" index 5f36ac1e..39a0c565 100644 --- "a/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" +++ "b/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" @@ -244,13 +244,14 @@ stateDiagram-v2 #### PostContent 값 객체 - **제목 필수**: 빈 제목 허용 안함 -- **내용 선택**: 본문은 빈 내용 허용 +- **내용 필수**: 본문은 빈 내용 허용 안함, 최대 50,000자 제한 - **불변성**: 수정 시 새로운 인스턴스 생성 #### PostMetaData 값 객체 - **시간 추적**: 생성/수정/발행 시점 기록 - **자동 갱신**: 수정 시 modifiedAt 자동 업데이트 - **불변성**: 시점 정보는 생성 후 변경 불가 +- **직렬화 지원**: Kafka 전송을 위해 관련 엔티티/값 객체(Post, PostContent, PostMetaData, Tag)는 모두 `Serializable`을 구현 ## 댓글(Comment) 애그리거트 @@ -368,37 +369,30 @@ classDiagram ### 포스트 관련 이벤트 ```mermaid -graph TB - subgraph "Post Events" - PC[PostCreated] - PU[PostUpdated] - PPub[PostPublished] - PH[PostHidden] - PD[PostDeleted] - PV[PostViewed] - PL[PostLiked] - PUL[PostUnliked] - end - - subgraph "Comment Events" - CC[CommentCreated] - CU[CommentUpdated] - CH[CommentHidden] - CD[CommentDeleted] - end - - subgraph "Event Handlers" - PSE[PostStatsEventHandler] - end - - PC --> PSE - PV --> PSE - PL --> PSE - PUL --> PSE - CC --> PSE - CD --> PSE +graph LR + PostAggregate((Post Aggregate)) + Handler[PostEventHandler] + Publisher[PostEventPublisher] + Producer[PostEventProducer] + Kafka[(Kafka Broker)] + Consumer[PostEventConsumer] + Processor[PostEventProcessor] + Indexer[PostSearchIndexer → Elasticsearch] + Direct[DirectPostEventPublisher] + Stats[PostStatsEventHandler] + + PostAggregate --> Handler + Handler --> Publisher + Publisher --> Producer + Producer --> |PostEventMessage| Kafka + Kafka --> Consumer --> Processor --> Indexer + Publisher --> Direct --> Processor + Processor --> Indexer + PostAggregate --> Stats ``` +도메인 이벤트는 Post 애그리거트에서 발행되며 `PostEventHandler`가 애플리케이션 이벤트로 수신합니다. 기본적으로 Kafka를 거쳐 `PostEventProcessor`가 Elasticsearch 색인을 갱신하고, 통계 관련 이벤트는 `PostStatsEventHandler`가 별도로 처리합니다. Kafka를 사용할 수 없는 경우 `DirectPostEventPublisher`가 동일한 프로세서를 직접 호출합니다. + ### 이벤트 처리 규칙 #### 통계 업데이트 @@ -564,27 +558,88 @@ public class Post extends AbstractAggregateRoot { #### 이벤트 처리 패턴 ```java -@Async -@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) -public void handlePostViewed(PostViewed event) { - if (event.postId() == null) return; - - log.info("PostViewed 이벤트 처리: postId={}, viewerId={}", event.postId(), event.memberId()); - - try { - postStatsManager.incrementViewCount(event.postId()); - } catch (Exception e) { - log.error("조회수 증가 실패: postId={}", event.postId(), e); - // 통계 실패가 메인 로직에 영향주지 않도록 격리 +@Component +@RequiredArgsConstructor +public class PostEventHandler { + + private final PostEventPublisher postEventPublisher; + + @EventListener + public void handlePostCreated(PostCreated event) { + postEventPublisher.publish(event); + } + + @EventListener + public void handlePostUpdated(PostUpdated event) { + postEventPublisher.publish(event); + } + + @EventListener + public void handlePostDeleted(PostDeleted event) { + postEventPublisher.publish(event); + } +} + +@Component +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +@RequiredArgsConstructor +public class PostEventProducer implements PostEventPublisher { + private final KafkaTemplate kafkaTemplate; + private final RetryTemplate retryTemplate; + + @Value("${see.kafka.topics.post-events:post-events}") + private String topic; + + @Override + public void publish(DomainEvent event) { + PostEventMessage message = PostEventMessage.from(event); + String key = message.postId().toString(); + + Runnable sender = () -> retryTemplate.execute(ctx -> { + kafkaTemplate.send(topic, key, message).get(); + return null; + }, recovery -> { + throw new IllegalStateException("Kafka 전송 실패", recovery.getLastThrowable()); + }); + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + sender.run(); + } + }); + } else { + sender.run(); + } + } +} + +@Component +@RequiredArgsConstructor +class PostEventProcessor { + private final PostRepository postRepository; + private final PostSearchIndexer indexer; + + void process(PostEventMessage message) { + switch (message.type()) { + case CREATED, UPDATED -> postRepository.findById(message.postId()) + .ifPresent(indexer::index); + case DELETED -> indexer.delete(message.postId()); + } } } ``` +Kafka를 사용할 수 없는 환경에서는 `see.kafka.enabled=false`로 설정하여 `DirectPostEventPublisher`가 즉시 `PostEventProcessor`를 호출하도록 구성했습니다. + #### 이벤트 처리 원칙 -- **트랜잭션 분리**: `@TransactionalEventListener(AFTER_COMMIT)`로 메인 트랜잭션과 분리 -- **비동기 처리**: `@Async`로 성능 최적화 -- **예외 격리**: try-catch로 사이드 이펙트 실패가 메인 로직에 영향 없도록 보장 -- **PostStatsManager 활용**: 인터페이스를 통한 관심사 분리 +- **트랜잭션 후처리**: Kafka 전송은 커밋 이후 `TransactionSynchronization`을 통해 수행 +- **메시지 키**: `postId`를 메시지 키로 사용해 파티션 내 순서를 보장 +- **재시도 정책**: `RetryTemplate`으로 최대 3회 재시도 후 실패를 호출자에게 전달 +- **컨슈머 에러 처리**: `DefaultErrorHandler`가 적용된 리스너 컨테이너로 재처리/로그를 관리 +- **Fallback 보장**: Kafka 비활성화 시에도 동일한 이벤트 플로우를 직접 실행 +- **테스트 격리**: Embedded Kafka 통합 테스트에서는 `${random.uuid}` 그룹 ID로 소비자를 분리 ### 5. 헥사고날 아키텍처 적용 @@ -680,4 +735,4 @@ public class JpaMemberRepository implements MemberRepository { - **Spring Data JPA 레퍼런스** - **헥사고날 아키텍처 가이드** -이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다. \ No newline at end of file +이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다. diff --git "a/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" "b/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" index 957e51af..90f0fdae 100644 --- "a/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" +++ "b/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" @@ -343,6 +343,20 @@ graph LR | **롤백** | Rollback | 트랜잭션을 취소하고 이전 상태로 복원 | "에러 발생 시 자동으로 롤백됩니다" | | **커밋** | Commit | 트랜잭션의 변경사항을 영구 반영 | "모든 작업이 성공하면 커밋합니다" | +### 메시징 & 이벤트 용어 + +| 한국어 | 영어 | 정의 | 사용 예시 | +|--------|------|------|-----------| +| **도메인 이벤트** | Domain Event | 애그리거트가 발행하는 상태 변화 알림 | "PostCreated 이벤트가 Kafka로 전송됩니다" | +| **PostEventPublisher** | PostEventPublisher | 도메인 이벤트를 외부 파이프라인으로 전달하는 애플리케이션 포트 | "PostEventHandler는 PostEventPublisher를 통해 메시지를 보냅니다" | +| **PostEventProducer** | PostEventProducer | KafkaTemplate을 사용해 이벤트 메시지를 브로커로 보내는 어댑터 | "재시도 정책이 적용된 PostEventProducer" | +| **PostEventMessage** | PostEventMessage | Kafka에 전송되는 직렬화 가능한 포스트 이벤트 DTO | "postId와 이벤트 타입만 포함한 경량 메시지" | +| **PostEventProcessor** | PostEventProcessor | Kafka 메시지를 읽어 최신 포스트 스냅샷을 색인 서비스로 위임하는 컴포넌트 | "메시지 타입에 따라 index/delete를 수행" | +| **DirectPostEventPublisher** | DirectPostEventPublisher | Kafka 비활성화 시 즉시 색인을 수행하는 대체 어댑터 | "로컬 개발에서는 DirectPostEventPublisher가 사용됩니다" | +| **Embedded Kafka** | Embedded Kafka | 테스트에서 경량 Kafka 브로커를 실행하는 도구 | "PostEventKafkaPipelineTest는 Embedded Kafka로 파이프라인을 검증" | +| **RetryTemplate** | RetryTemplate | 일정한 정책으로 연산을 재시도하게 해주는 Spring Retry 유틸리티 | "Kafka 전송 실패 시 RetryTemplate이 최대 3회 재시도합니다" | +| **see.kafka.enabled** | see.kafka.enabled | Kafka 사용 여부를 제어하는 애플리케이션 설정 | "테스트에서는 see.kafka.enabled=false로 즉시 처리" | + ## 💡 디자인 패턴 용어 ### 생성 패턴 @@ -402,4 +416,4 @@ graph LR --- -이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요. \ No newline at end of file +이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요. diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 92dba17b..9e10b48c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,66 +32,84 @@ jobs: --health-timeout 5s --health-retries 10 + kafka: + image: confluentinc/cp-kafka:7.6.1 + ports: + - 9092:9092 + env: + CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA + KAFKA_CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + options: >- + --health-cmd "nc -z localhost 9092" + --health-interval 15s + --health-timeout 10s + --health-retries 10 + steps: - - name: Check out repository + - name: 🧩 Checkout repository uses: actions/checkout@v4 with: fetch-depth: 1 - - name: Set up JDK 21 + - name: ☕️ Set up JDK 21 uses: actions/setup-java@v4 with: java-version: '21' distribution: 'temurin' cache: gradle - - name: Create application.yml file + - name: 📝 Create application.yml run: | mkdir -p ./src/main/resources echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml - - name: Disable IPv6 + - name: 🧱 Disable IPv6 (network stability) run: | sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1 - - name: Validate Gradle wrapper + - name: 🧩 Validate Gradle Wrapper uses: gradle/wrapper-validation-action@v2 - with: - min-wrapper-count: 1 - allow-snapshots: false - - name: Make gradlew executable + - name: 🧍 Make gradlew executable run: chmod +x ./gradlew - - name: Verify Gradle Wrapper Checksum - run: | - if [ -f "gradle/wrapper/gradle-wrapper.properties" ]; then - echo "Gradle wrapper configuration verified" - cat gradle/wrapper/gradle-wrapper.properties - fi - - - name: Setup Gradle + - name: ⚙️ Setup Gradle uses: gradle/gradle-build-action@v2 with: gradle-build-scan-report: true cache-read-only: false - - name: Wait for Elasticsearch to be ready + - name: 🕐 Wait for Kafka and Elasticsearch run: | - echo "Waiting for Elasticsearch..." - for i in {1..20}; do - if curl -fsS http://localhost:9200 >/dev/null; then - echo "Elasticsearch is up!" + echo "Waiting for Elasticsearch and Kafka to be ready..." + for i in {1..25}; do + es_ready=$(curl -fsS http://localhost:9200/_cluster/health > /dev/null && echo "yes" || echo "no") + kafka_ready=$(nc -z localhost 9092 && echo "yes" || echo "no") + + if [ "$es_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then + echo "✅ All services are ready!" exit 0 fi - echo "Retrying in 3s..." - sleep 3 - done - echo "Elasticsearch did not become ready in time." - exit 1 - - name: Run tests + echo "⏳ Waiting for services... retrying in 5s ($i/25)" + sleep 5 + done + + echo "❌ Services did not start in time" + exit 1 + + - name: 🧪 Run tests run: | ./gradlew clean test \ --parallel \ @@ -102,8 +120,9 @@ jobs: -Dspring.profiles.active=test env: SPRING_ELASTICSEARCH_URIS: http://localhost:9200 + SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - - name: Upload test results + - name: 📦 Upload test results uses: actions/upload-artifact@v4 if: always() with: @@ -118,29 +137,39 @@ jobs: if: github.event_name == 'pull_request' steps: - - name: Check out repository + - name: 🧩 Checkout repository uses: actions/checkout@v4 with: fetch-depth: 1 - - name: Set up JDK 21 + - name: ☕️ Set up JDK 21 uses: actions/setup-java@v4 with: java-version: '21' distribution: 'temurin' cache: gradle - - name: Create application.yml file + - name: 📝 Create application.yml run: | mkdir -p ./src/main/resources echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml - - name: Setup Gradle + - name: ⚙️ Setup Gradle uses: gradle/gradle-build-action@v2 - - name: Compile check + - name: 🧱 Compile check run: | ./gradlew compileJava compileTestJava \ --parallel \ --build-cache \ - --configuration-cache \ No newline at end of file + --configuration-cache + + - name: 📦 Upload build results + uses: actions/upload-artifact@v4 + if: always() + with: + name: build-results + path: | + build/classes/ + build/libs/ + retention-days: 7 diff --git a/README.md b/README.md index 25221c31..4e546bf1 100644 --- a/README.md +++ b/README.md @@ -66,12 +66,21 @@ - **Application Layer**: 유스케이스 조율과 트랜잭션 관리 - Primary Port: MemberManager, PostManager, CommentManager - Secondary Port: MemberRepository, PostRepository, PostStatsRepository - - 이벤트 핸들러: PostStatsEventHandler + - 이벤트 핸들러: PostEventHandler, PostStatsEventHandler - **Adapter Layer**: 외부 시스템과의 연동 - Web API: REST 엔드포인트 - Security: JWT 인증/인가 - Persistence: JPA 구현체 + - Integration: Kafka 프로듀서/컨슈머, Elasticsearch 색인기 + +### 이벤트 파이프라인 + +1. **도메인 이벤트 발행**: Post, Comment 등 애그리거트가 `AbstractAggregateRoot`를 통해 이벤트를 수집합니다. +2. **애플리케이션 이벤트**: `PostEventHandler`가 스프링 애플리케이션 이벤트를 받아 `PostEventPublisher`(포트)에 위임합니다. +3. **Kafka 전송**: `PostEventProducer`는 트랜잭션 커밋 이후 Kafka로 메시지를 전송하며, `postId`를 메시지 키로 사용하고 `RetryTemplate`으로 재시도 정책을 적용합니다. +4. **컨슈머 처리**: `PostEventConsumer`는 `DefaultErrorHandler`가 적용된 리스너 컨테이너에서 메시지를 소비하고, `PostEventProcessor`가 최신 게시글을 재조회해 Elasticsearch 색인을 갱신합니다. +5. **Fallback 모드**: `see.kafka.enabled=false` 일 때는 `DirectPostEventPublisher`가 즉시 색인 작업을 수행하여 Kafka 없이도 동일한 로직을 유지합니다. ## 기술 스택 @@ -80,10 +89,13 @@ - **Spring Boot 3.5.4**: 최신 스프링 부트로 개발 생산성 향상 - **Spring Data JPA**: 데이터 접근 계층 추상화 - **Spring Security**: 인증/인가 보안 체계 +- **Spring Retry**: 외부 시스템 연동 시 재시도 정책 지원 -### Database +### Storage & Messaging - **MySQL 8.0**: 운영 환경 데이터베이스 - **H2**: 개발/테스트 환경 인메모리 데이터베이스 +- **Elasticsearch 8.x**: 검색 인덱스 저장소 +- **Apache Kafka 7.x (Confluent)**: 도메인 이벤트 브로커 - **Docker Compose**: 컨테이너 기반 개발 환경 ### Testing & Quality @@ -111,17 +123,27 @@ git clone https://github.com/your-repo/see.git cd see ``` -### 2. 데이터베이스 시작 +### 2. 인프라 기동 (선택) ```bash -docker-compose up -d mysql +# MySQL, Elasticsearch, Kafka를 한 번에 실행하려면 +docker compose up -d mysql elasticsearch kafka + +# 또는 필요한 서비스만 선택적으로 실행할 수 있습니다. ``` +Kafka를 사용하지 않는 개발 환경이라면 위 단계를 생략하고 `see.kafka.enabled=false` 프로필을 활성화하면 됩니다. + ### 3. 애플리케이션 실행 ```bash ./gradlew bootRun ``` -### 4. API 테스트 +### 4. 환경 설정 팁 +- `see.kafka.enabled=false`: Kafka 없이 즉시 색인을 수행 (기본 테스트 프로필) +- `spring.kafka.bootstrap-servers`: 로컬 Kafka 브로커 주소 (임베디드 테스트에서는 자동 주입) +- `spring.elasticsearch.uris`: Elasticsearch 연결 주소 (기본값 `http://localhost:9200`) + +### 5. API 테스트 ```bash # 회원 가입 curl -X POST http://localhost:8080/api/members \ @@ -184,7 +206,8 @@ src/ │ └── adapter/ # 어댑터 계층 │ ├── webapi/ # REST API │ ├── security/ # 보안 -│ └── persistence/ # 데이터베이스 +│ ├── persistence/ # 데이터베이스 +│ └── integration/ # Kafka, Elasticsearch 연동 └── test/ # 테스트 코드 ├── domain/ # 단위 테스트 (순수 Java) ├── application/ # 통합 테스트 @@ -197,12 +220,16 @@ src/ - **단위 테스트**: 도메인 로직을 순수 Java로 빠르게 검증 - **통합 테스트**: 포트 구현체를 모의 객체로 대체하여 애플리케이션 서비스 검증 - **인수 테스트**: 실제 어댑터를 사용하여 End-to-End 시나리오 검증 +- **Kafka 통합 테스트**: Embedded Kafka를 활용해 이벤트 파이프라인을 종단 간 검증 ### 테스트 실행 ```bash -# 전체 테스트 +# 전체 테스트 (Kafka 비활성화 프로필 기본 적용) ./gradlew test +# Kafka 파이프라인 통합 테스트만 실행 (Embedded Kafka) +./gradlew test --tests PostEventKafkaPipelineTest + # 테스트 커버리지 확인 ./gradlew jacocoTestReport open build/jacocoHtml/index.html diff --git a/build.gradle.kts b/build.gradle.kts index f56f3a33..e80d36ba 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -48,6 +48,8 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-validation") implementation("org.springframework.security:spring-security-core") + implementation("org.springframework.kafka:spring-kafka") + implementation("org.springframework.retry:spring-retry") implementation("io.jsonwebtoken:jjwt-api:0.12.6") implementation("io.jsonwebtoken:jjwt-impl:0.12.6") implementation("io.jsonwebtoken:jjwt-jackson:0.12.6") @@ -62,6 +64,7 @@ dependencies { testCompileOnly("org.projectlombok:lombok") testImplementation("org.springframework.boot:spring-boot-starter-test") + testImplementation("org.springframework.kafka:spring-kafka-test") testRuntimeOnly("org.junit.platform:junit-platform-launcher") testImplementation("org.junit-pioneer:junit-pioneer:2.3.0") testImplementation("org.mockito:mockito-core:5.18.0") diff --git a/docker-compose.yml b/docker-compose.yml index 1251589a..67f3ffa2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,10 @@ +version: "3.8" + services: mysql: image: mysql:8.0 container_name: see-mysql + restart: unless-stopped ports: - "3306:3306" environment: @@ -14,30 +17,15 @@ services: - ./docker/mysql/init:/docker-entrypoint-initdb.d command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] - timeout: 20s + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-proot"] + interval: 10s + timeout: 5s retries: 10 - restart: unless-stopped - -# mysql-test: -# image: mysql:8.0 -# container_name: see-test-mysql -# ports: -# - "3307:3306" -# environment: -# MYSQL_ROOT_PASSWORD: root -# MYSQL_DATABASE: see_test -# MYSQL_USER: see_user -# MYSQL_PASSWORD: see_password -# volumes: -# - mysql_test_data:/var/lib/mysql -# - ./docker/mysql/init:/docker-entrypoint-initdb.d -# command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci -# restart: unless-stopped elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0 container_name: see-elasticsearch + restart: unless-stopped environment: - discovery.type=single-node - xpack.security.enabled=false @@ -51,23 +39,64 @@ services: interval: 10s timeout: 5s retries: 5 - restart: unless-stopped kibana: image: docker.elastic.co/kibana/kibana:8.12.0 container_name: see-kibana + restart: unless-stopped ports: - "5601:5601" environment: ELASTICSEARCH_HOSTS: http://see-elasticsearch:9200 depends_on: - - elasticsearch + elasticsearch: + condition: service_healthy + + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + container_name: see-zookeeper restart: unless-stopped + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + + kafka: + image: confluentinc/cp-kafka:7.6.1 + container_name: see-kafka + restart: unless-stopped + depends_on: + zookeeper: + condition: service_healthy + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + # 두 가지 리스너를 노출하여 컨테이너 내부/호스트 접근 모두 지원 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://see-kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + + # 기본 동작 설정 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG_RETENTION_HOURS: 168 + KAFKA_DELETE_TOPIC_ENABLE: "true" + + healthcheck: + test: [ "CMD", "bash", "-c", "kafka-topics --bootstrap-server localhost:9092 --list >/dev/null 2>&1" ] + interval: 10s + timeout: 5s + retries: 10 volumes: mysql_data: - driver: local - mysql_test_data: es_data: - driver: local - diff --git a/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java b/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java new file mode 100644 index 00000000..dbe2f0c4 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java @@ -0,0 +1,22 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostEventPublisher; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "false", matchIfMissing = true) +public class DirectPostEventPublisher implements PostEventPublisher { + private final PostEventProcessor postEventProcessor; + + @Override + public void publish(DomainEvent event) { + log.debug("Kafka 비활성화 상태 - 도메인 이벤트를 직접 처리합니다: {}", event); + postEventProcessor.process(event); + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java new file mode 100644 index 00000000..48aeda9b --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java @@ -0,0 +1,25 @@ +package dooya.see.adapter.integration.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +public class PostEventConsumer { + private final PostEventProcessor postEventProcessor; + + @KafkaListener( + topics = "${see.kafka.topics.post-events:post-events}", + groupId = "${spring.kafka.consumer.group-id:post-indexer-group}", + containerFactory = "kafkaListenerContainerFactory" + ) + public void consume(PostEventMessage message) { + log.debug("Kafka 이벤트 수신: {}", message); + postEventProcessor.process(message); + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java new file mode 100644 index 00000000..decf3a25 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java @@ -0,0 +1,17 @@ +package dooya.see.adapter.integration.kafka; + +import java.io.Serializable; + +public record PostEventMessage(PostEventType type, Long postId) implements Serializable { + static PostEventMessage created(Long postId) { + return new PostEventMessage(PostEventType.CREATED, postId); + } + + static PostEventMessage updated(Long postId) { + return new PostEventMessage(PostEventType.UPDATED, postId); + } + + static PostEventMessage deleted(Long postId) { + return new PostEventMessage(PostEventType.DELETED, postId); + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java new file mode 100644 index 00000000..bcb6be5d --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java @@ -0,0 +1,99 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostRepository; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.event.PostCreated; +import dooya.see.domain.post.event.PostDeleted; +import dooya.see.domain.post.event.PostUpdated; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +class PostEventProcessor { + private final PostSearchIndexer postSearchIndexer; + private final PostRepository postRepository; + + void process(DomainEvent event) { + if (event instanceof PostCreated created) { + handlePostCreated(created); + } else if (event instanceof PostUpdated updated) { + handlePostUpdated(updated); + } else if (event instanceof PostDeleted deleted) { + handlePostDeleted(deleted); + } else { + log.debug("처리할 필요 없는 이벤트 타입: {}", event.getClass().getName()); + } + } + + void process(PostEventMessage message) { + if (message == null) { + return; + } + + try { + switch (message.type()) { + case CREATED, UPDATED -> indexLatestSnapshot(message.postId()); + case DELETED -> deleteIndex(message.postId()); + default -> log.debug("처리할 필요 없는 메시지 타입: {}", message.type()); + } + } catch (Exception ex) { + log.error("Kafka 이벤트 메시지 처리 실패: {}", message, ex); + } + } + + private void indexLatestSnapshot(Long postId) { + postRepository.findById(postId) + .ifPresentOrElse(this::safeIndex, + () -> log.warn("색인 대상 게시글을 찾을 수 없습니다: postId={}", postId)); + } + + private void deleteIndex(Long postId) { + try { + log.debug("Kafka 메시지 처리 - 게시글 삭제 색인 제거: {}", postId); + postSearchIndexer.delete(postId); + } catch (Exception e) { + log.error("게시글 삭제 색인 처리 실패: postId={}", postId, e); + } + } + + private void safeIndex(Post post) { + try { + log.debug("Kafka 메시지 처리 - 게시글 색인 갱신: {}", post.getId()); + postSearchIndexer.index(post); + } catch (Exception e) { + log.error("게시글 색인 처리 실패: postId={}", post.getId(), e); + } + } + + private void handlePostCreated(PostCreated event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 생성 색인: {}", event.postId()); + postSearchIndexer.index(event.post()); + } catch (Exception e) { + log.error("게시글 생성 색인 처리 실패: {}", event, e); + } + } + + private void handlePostUpdated(PostUpdated event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 업데이트 색인: {}", event.postId()); + postSearchIndexer.index(event.post()); + } catch (Exception e) { + log.error("게시글 업데이트 색인 처리 실패: {}", event, e); + } + } + + private void handlePostDeleted(PostDeleted event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 삭제 색인 제거: {}", event.postId()); + postSearchIndexer.delete(event.postId()); + } catch (Exception e) { + log.error("게시글 삭제 색인 처리 실패: {}", event, e); + } + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java new file mode 100644 index 00000000..c50a1bc4 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java @@ -0,0 +1,103 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostEventPublisher; +import dooya.see.domain.post.event.PostCreated; +import dooya.see.domain.post.event.PostDeleted; +import dooya.see.domain.post.event.PostUpdated; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.util.concurrent.ExecutionException; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +public class PostEventProducer implements PostEventPublisher { + private final KafkaTemplate kafkaTemplate; + private final RetryTemplate retryTemplate; + + @Value("${see.kafka.topics.post-events:post-events}") + private String topic; + + @Override + public void publish(DomainEvent event) { + PostEventMessage message = toMessage(event); + if (message == null) { + if (log.isDebugEnabled()) { + log.debug("Kafka 전송 대상이 아닌 이벤트입니다: {}", event.getClass().getName()); + } + return; + } + + String key = message.postId() != null ? message.postId().toString() : null; + + Runnable sendTask = () -> { + try { + sendWithRetry(key, message); + } catch (Exception unexpected) { + log.error("Kafka 이벤트 발행 중 처리되지 않은 예외 발생: {}", message, unexpected); + } + }; + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + sendTask.run(); + } + }); + } else { + sendTask.run(); + } + } + + private PostEventMessage toMessage(DomainEvent event) { + if (event instanceof PostCreated created) { + return PostEventMessage.created(created.postId()); + } + if (event instanceof PostUpdated updated) { + return PostEventMessage.updated(updated.postId()); + } + if (event instanceof PostDeleted deleted) { + return PostEventMessage.deleted(deleted.postId()); + } + return null; + } + + private void sendWithRetry(String key, PostEventMessage message) { + retryTemplate.execute(retryContext -> { + try { + var result = kafkaTemplate.send(topic, key, message).get(); + if (result != null && log.isDebugEnabled()) { + log.debug("Kafka 이벤트 발행 성공: topic={}, partition={}, offset={}, key={}, payload={}", + result.getRecordMetadata().topic(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset(), + key, + message); + } + return null; + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Kafka 전송이 인터럽트되었습니다", interrupted); + } catch (ExecutionException executionException) { + Throwable cause = executionException.getCause() != null ? executionException.getCause() : executionException; + throw new IllegalStateException("Kafka 전송 실패", cause); + } + }, recoveryContext -> { + Throwable lastError = recoveryContext.getLastThrowable(); + log.error("Kafka 이벤트 발행 실패(재시도 완료): {}", message, lastError); + // TODO: DLQ 또는 모니터링 알림 연동을 고려합니다. + return null; + }); + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java new file mode 100644 index 00000000..0bbbf353 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java @@ -0,0 +1,7 @@ +package dooya.see.adapter.integration.kafka; + +public enum PostEventType { + CREATED, + UPDATED, + DELETED +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 00000000..e723ee06 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,57 @@ +package dooya.see.adapter.integration.kafka.config; + +import dooya.see.adapter.integration.kafka.PostEventMessage; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +public class KafkaConsumerConfig { + private final KafkaProperties kafkaProperties; + + public KafkaConsumerConfig(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public ConsumerFactory postEventConsumerFactory() { + Map config = new HashMap<>(kafkaProperties.buildConsumerProperties(null)); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + config.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "post-indexer-group"); + config.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + JsonDeserializer jsonDeserializer = new JsonDeserializer<>(PostEventMessage.class); + jsonDeserializer.addTrustedPackages("dooya.see.adapter.integration.kafka"); + jsonDeserializer.setRemoveTypeHeaders(false); + jsonDeserializer.setUseTypeMapperForKey(false); + + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory postEventConsumerFactory + ) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(postEventConsumerFactory); + factory.setConcurrency(1); + factory.setCommonErrorHandler(new DefaultErrorHandler()); + return factory; + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java new file mode 100644 index 00000000..febe498b --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,50 @@ +package dooya.see.adapter.integration.kafka.config; + +import dooya.see.adapter.integration.kafka.PostEventMessage; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.retry.support.RetryTemplate; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +public class KafkaProducerConfig { + private final KafkaProperties kafkaProperties; + + public KafkaProducerConfig(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public ProducerFactory postEventProducerFactory() { + Map config = new HashMap<>(kafkaProperties.buildProducerProperties(null)); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true); + + return new DefaultKafkaProducerFactory<>(config); + } + + @Bean + public KafkaTemplate postEventKafkaTemplate(ProducerFactory postEventProducerFactory) { + return new KafkaTemplate<>(postEventProducerFactory); + } + + @Bean + public RetryTemplate kafkaRetryTemplate() { + return RetryTemplate.builder() + .maxAttempts(3) + .exponentialBackoff(200, 2.0, 2_000) + .build(); + } +} diff --git a/src/main/java/dooya/see/application/post/PostEventHandler.java b/src/main/java/dooya/see/application/post/PostEventHandler.java index 11344965..2c4426eb 100644 --- a/src/main/java/dooya/see/application/post/PostEventHandler.java +++ b/src/main/java/dooya/see/application/post/PostEventHandler.java @@ -1,9 +1,10 @@ package dooya.see.application.post; -import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.application.post.required.PostEventPublisher; import dooya.see.domain.post.event.PostCreated; import dooya.see.domain.post.event.PostDeleted; import dooya.see.domain.post.event.PostUpdated; +import dooya.see.domain.shared.DomainEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -13,23 +14,25 @@ @Component @RequiredArgsConstructor public class PostEventHandler { - private final PostSearchIndexer postSearchIndexer; + private final PostEventPublisher postEventPublisher; @EventListener public void handlePostCreated(PostCreated event) { - log.info("[ES] 게시글 생성 색인: {}", event.postId()); - postSearchIndexer.index(event.post()); + publish(event); } @EventListener public void handlePostUpdated(PostUpdated event) { - log.info("[ES] 게시글 업데이트 색인: {}", event.postId()); - postSearchIndexer.index(event.post()); + publish(event); } @EventListener public void handlePostDeleted(PostDeleted event) { - log.info("[ES] 게시글 색인 삭제: {}", event.postId()); - postSearchIndexer.delete(event.postId()); + publish(event); + } + + private void publish(DomainEvent event) { + log.debug("도메인 이벤트를 외부 파이프라인으로 위임: {}", event); + postEventPublisher.publish(event); } } diff --git a/src/main/java/dooya/see/application/post/required/PostEventPublisher.java b/src/main/java/dooya/see/application/post/required/PostEventPublisher.java new file mode 100644 index 00000000..9f79b592 --- /dev/null +++ b/src/main/java/dooya/see/application/post/required/PostEventPublisher.java @@ -0,0 +1,7 @@ +package dooya.see.application.post.required; + +import dooya.see.domain.shared.DomainEvent; + +public interface PostEventPublisher { + void publish(DomainEvent event); +} diff --git a/src/main/java/dooya/see/domain/post/Post.java b/src/main/java/dooya/see/domain/post/Post.java index 55ccfd38..b1ec8713 100644 --- a/src/main/java/dooya/see/domain/post/Post.java +++ b/src/main/java/dooya/see/domain/post/Post.java @@ -20,7 +20,8 @@ @Builder(access = AccessLevel.PRIVATE) @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PROTECTED) -public class Post extends AbstractAggregateRoot { +public class Post extends AbstractAggregateRoot implements java.io.Serializable { + private static final long serialVersionUID = 1L; @Embedded private PostContent content; diff --git a/src/main/java/dooya/see/domain/post/PostContent.java b/src/main/java/dooya/see/domain/post/PostContent.java index e3faed89..98d7ac23 100644 --- a/src/main/java/dooya/see/domain/post/PostContent.java +++ b/src/main/java/dooya/see/domain/post/PostContent.java @@ -9,7 +9,9 @@ public record PostContent( String title, @Column(name = "body", length = 50000, nullable = false) String body -) { +) implements java.io.Serializable { + private static final long serialVersionUID = 1L; + public PostContent { validateTitle(title); validateBody(body); diff --git a/src/main/java/dooya/see/domain/post/PostMetaData.java b/src/main/java/dooya/see/domain/post/PostMetaData.java index 488cc3f4..63b7d883 100644 --- a/src/main/java/dooya/see/domain/post/PostMetaData.java +++ b/src/main/java/dooya/see/domain/post/PostMetaData.java @@ -2,6 +2,7 @@ import jakarta.persistence.Embeddable; +import java.io.Serializable; import java.time.LocalDateTime; @Embeddable @@ -11,7 +12,8 @@ public record PostMetaData( LocalDateTime modifiedAt, LocalDateTime publishedAt -) { +) implements Serializable { + private static final long serialVersionUID = 1L; public static PostMetaData create() { LocalDateTime now = LocalDateTime.now(); diff --git a/src/main/java/dooya/see/domain/post/Tag.java b/src/main/java/dooya/see/domain/post/Tag.java index 59e7e7f6..aefc0dd1 100644 --- a/src/main/java/dooya/see/domain/post/Tag.java +++ b/src/main/java/dooya/see/domain/post/Tag.java @@ -2,10 +2,12 @@ import jakarta.persistence.Embeddable; +import java.io.Serializable; import java.util.regex.Pattern; @Embeddable -public record Tag(String name) { +public record Tag(String name) implements Serializable { + private static final long serialVersionUID = 1L; private static final Pattern TAG_PATTERN = Pattern.compile("^[가-힣a-zA-Z0-9]+$"); public Tag { diff --git a/src/main/java/dooya/see/domain/post/event/PostCreated.java b/src/main/java/dooya/see/domain/post/event/PostCreated.java index 6ef91289..56a5b623 100644 --- a/src/main/java/dooya/see/domain/post/event/PostCreated.java +++ b/src/main/java/dooya/see/domain/post/event/PostCreated.java @@ -4,11 +4,13 @@ import dooya.see.domain.post.PostCategory; import dooya.see.domain.shared.DomainEvent; +import java.io.Serializable; + public record PostCreated( Long postId, Long memberId, PostCategory category, boolean publishImmediately, Post post -) implements DomainEvent { +) implements DomainEvent, Serializable { } diff --git a/src/main/java/dooya/see/domain/post/event/PostUpdated.java b/src/main/java/dooya/see/domain/post/event/PostUpdated.java index b6baeaae..f6628e12 100644 --- a/src/main/java/dooya/see/domain/post/event/PostUpdated.java +++ b/src/main/java/dooya/see/domain/post/event/PostUpdated.java @@ -3,6 +3,8 @@ import dooya.see.domain.post.Post; import dooya.see.domain.shared.DomainEvent; +import java.io.Serializable; + /** * 게시글이 수정되었을 때 발생하는 도메인 이벤트 */ @@ -14,4 +16,4 @@ public record PostUpdated( boolean categoryChanged, boolean tagsChanged, Post post -) implements DomainEvent {} +) implements DomainEvent, Serializable {} diff --git a/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java b/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java new file mode 100644 index 00000000..a0dfa9e2 --- /dev/null +++ b/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java @@ -0,0 +1,113 @@ +package dooya.see.application.post; + +import dooya.see.application.post.provided.PostManager; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.dto.PostUpdateRequest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import static dooya.see.domain.post.PostFixture.createPostRequest; +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(properties = "see.kafka.enabled=false") +class PostEventDirectPipelineTest { + + private final PostManager postManager; + private final RecordingPostSearchIndexer recordingIndexer; + + PostEventDirectPipelineTest(PostManager postManager, RecordingPostSearchIndexer recordingIndexer) { + this.postManager = postManager; + this.recordingIndexer = recordingIndexer; + } + + @BeforeEach + void setUp() { + recordingIndexer.reset(); + } + + @Test + void Kafka_비활성화_시에도_도메인_이벤트가_즉시_색인된다() { + Post created = postManager.create(createPostRequest(), authorId()); + + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId())); + } + + @Test + void Kafka_비활성화_시_수정_이벤트도_즉시_처리된다() { + Post created = postManager.create(createPostRequest(), authorId()); + recordingIndexer.reset(); + + PostUpdateRequest updateRequest = new PostUpdateRequest( + Optional.of("직접 변경된 제목"), + Optional.of("직접 변경된 본문"), + Optional.empty(), + Optional.empty() + ); + + Post updated = postManager.update(updateRequest, created.getId(), authorId()); + + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(updated.getId()) + && post.getContent().title().equals("직접 변경된 제목")); + } + + @Test + void Kafka_비활성화_시_삭제_이벤트도_즉시_처리된다() { + Post created = postManager.create(createPostRequest(), authorId()); + recordingIndexer.reset(); + + postManager.delete(created.getId(), authorId()); + + assertThat(recordingIndexer.deletedPostIds()).contains(created.getId()); + } + + private static Long authorId() { + return 1L; + } + + static class RecordingPostSearchIndexer implements PostSearchIndexer { + private final CopyOnWriteArrayList indexed = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList deleted = new CopyOnWriteArrayList<>(); + + @Override + public void index(Post post) { + indexed.add(post); + } + + @Override + public void delete(Long postId) { + deleted.add(postId); + } + + List indexedPosts() { + return List.copyOf(indexed); + } + + List deletedPostIds() { + return List.copyOf(deleted); + } + + void reset() { + indexed.clear(); + deleted.clear(); + } + } + + @org.springframework.boot.test.context.TestConfiguration + static class TestConfiguration { + @Bean + @Primary + RecordingPostSearchIndexer recordingPostSearchIndexer() { + return new RecordingPostSearchIndexer(); + } + } +} diff --git a/src/test/java/dooya/see/application/post/PostEventHandlerTest.java b/src/test/java/dooya/see/application/post/PostEventHandlerTest.java deleted file mode 100644 index c55ccfe5..00000000 --- a/src/test/java/dooya/see/application/post/PostEventHandlerTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package dooya.see.application.post; - -import dooya.see.adapter.search.elasticsearch.repository.PostSearchElasticsearchRepository; -import dooya.see.application.post.provided.PostManager; -import dooya.see.domain.post.Post; -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -import java.time.Duration; - -import static dooya.see.domain.post.PostFixture.createPostRequest; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -@SpringBootTest -record PostEventHandlerTest( - PostManager postManager, - PostSearchElasticsearchRepository searchRepository) { - private static final Long AUTHOR_ID = 1L; - - @Test - void 게시글_생성시_자동으로_색인된다() { - Post post = postManager.create(createPostRequest(), AUTHOR_ID); - - await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> - assertThat(searchRepository.findById(String.valueOf(post.getId()))).isPresent()); - } -} \ No newline at end of file diff --git a/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java new file mode 100644 index 00000000..a1262e54 --- /dev/null +++ b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java @@ -0,0 +1,135 @@ +package dooya.see.application.post; + +import dooya.see.application.post.provided.PostManager; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.dto.PostCreateRequest; +import dooya.see.domain.post.dto.PostUpdateRequest; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import static dooya.see.domain.post.PostFixture.createPostRequest; +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(properties = { + "see.kafka.enabled=true", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.group-id=${random.uuid}" +}) +@EmbeddedKafka(partitions = 1, topics = "${see.kafka.topics.post-events:post-events}") +@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") +class PostEventKafkaPipelineTest { + + private final PostManager postManager; + private final RecordingPostSearchIndexer recordingIndexer; + + PostEventKafkaPipelineTest(PostManager postManager, RecordingPostSearchIndexer recordingIndexer) { + this.postManager = postManager; + this.recordingIndexer = recordingIndexer; + } + + @BeforeEach + void setUp() { + recordingIndexer.reset(); + } + + @Test + void 게시글_생성_이벤트는_Kafka를_통해_색인된다() { + Post created = postManager.create(createPostRequest(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + } + + @Test + void 게시글_수정_이벤트는_Kafka를_통해_색인이_갱신된다() { + Post created = postManager.create(createPostRequest(), authorId()); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + + recordingIndexer.reset(); + + PostUpdateRequest updateRequest = new PostUpdateRequest( + Optional.of("변경된 제목"), + Optional.of("변경된 본문"), + Optional.empty(), + Optional.empty() + ); + + Post updated = postManager.update(updateRequest, created.getId(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(updated.getId()) + && post.getContent().title().equals("변경된 제목"))); + } + + @Test + void 게시글_삭제_이벤트는_Kafka를_통해_색인이_삭제된다() { + Post created = postManager.create(createPostRequest(), authorId()); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + + recordingIndexer.reset(); + + postManager.delete(created.getId(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.deletedPostIds()).contains(created.getId())); + } + + private static Long authorId() { + return 1L; + } + + static class RecordingPostSearchIndexer implements PostSearchIndexer { + private final CopyOnWriteArrayList indexed = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList deleted = new CopyOnWriteArrayList<>(); + + @Override + public void index(Post post) { + indexed.add(post); + } + + @Override + public void delete(Long postId) { + deleted.add(postId); + } + + List indexedPosts() { + return List.copyOf(indexed); + } + + List deletedPostIds() { + return List.copyOf(deleted); + } + + void reset() { + indexed.clear(); + deleted.clear(); + } + } + + @org.springframework.boot.test.context.TestConfiguration + static class TestConfiguration { + @Bean + @Primary + RecordingPostSearchIndexer recordingPostSearchIndexer() { + return new RecordingPostSearchIndexer(); + } + } +} diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index c8dbdac6..d84f75f7 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -1,23 +1,12 @@ spring: - datasource: - url: jdbc:mysql://localhost:3306/see_test?serverTimezone=Asia/Seoul&useSSL=false&allowPublicKeyRetrieval=true - username: see_user - password: see_password - driver-class-name: com.mysql.cj.jdbc.Driver - jpa: hibernate: - ddl-auto: none - properties: - hibernate: - dialect: org.hibernate.dialect.MySQL8Dialect # ✅ 이 한 줄 추가! - jdbc: - batch_size: 1000 - order_updates: true - order_inserts: true - show-sql: true # SQL 출력 원하면 추가 + ddl-auto: update logging: level: dooya.see: INFO - org.hibernate.SQL: DEBUG \ No newline at end of file + +see: + kafka: + enabled: false diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties new file mode 100644 index 00000000..f841722a --- /dev/null +++ b/src/test/resources/application.properties @@ -0,0 +1 @@ +spring.profiles.active=test