Skip to content

Commit 82eedde

Browse files
committed
feat: transfer-relay 멀티 인스턴스가 아닌, 단일 스레드에서 multiThread 방식으로 개선
1 parent 6fab2be commit 82eedde

File tree

25 files changed

+641
-2029
lines changed

25 files changed

+641
-2029
lines changed
Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,10 @@
1-
//plugins {
2-
// kotlin("jvm")
3-
// kotlin("plugin.spring")
4-
//}
5-
//
6-
//dependencies {
7-
// implementation(project(":transfer-domain"))
8-
// implementation(project(":common-application"))
9-
// implementation(project(":common-domain"))
10-
//
11-
// implementation("org.springframework:spring-context")
12-
// implementation("org.springframework:spring-tx")
13-
//
14-
// testImplementation("io.kotest:kotest-runner-junit5")
15-
// testImplementation("io.kotest:kotest-assertions-core")
16-
//}
17-
181
plugins {
192
id("transentia.spring-library")
203
}
214

225
dependencies {
23-
// 프로젝트 의존성
246
implementation(project(":transfer-domain"))
257
implementation(project(":common-application"))
268
implementation(project(":common-domain"))
27-
28-
// 특화된 의존성 (있다면 추가)
29-
// 예: implementation("org.springframework.retry:spring-retry")
9+
implementation(project(":kafka-model"))
3010
}
Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,97 @@
11
package io.github.hyungkishin.transentia.application
22

3+
import io.github.hyungkishin.transentia.application.mapper.OutboxEventMapper
34
import io.github.hyungkishin.transentia.application.provided.TransactionRegister
45
import io.github.hyungkishin.transentia.application.provided.command.TransferRequestCommand
56
import io.github.hyungkishin.transentia.application.required.TransactionRepository
7+
import io.github.hyungkishin.transentia.application.required.TransferEventsOutboxRepository
68
import io.github.hyungkishin.transentia.application.required.UserRepository
79
import io.github.hyungkishin.transentia.application.required.command.TransferResponseCommand
810
import io.github.hyungkishin.transentia.common.error.CommonError
911
import io.github.hyungkishin.transentia.common.error.DomainException
12+
import io.github.hyungkishin.transentia.common.message.transfer.TransferCompleted
1013
import io.github.hyungkishin.transentia.common.snowflake.IdGenerator
1114
import io.github.hyungkishin.transentia.common.snowflake.SnowFlakeId
1215
import io.github.hyungkishin.transentia.container.model.transaction.Transaction
1316
import io.github.hyungkishin.transentia.container.validator.transfer.TransferValidator
17+
import org.slf4j.LoggerFactory
1418
import org.springframework.context.ApplicationEventPublisher
1519
import org.springframework.stereotype.Service
1620
import org.springframework.transaction.annotation.Transactional
21+
import java.time.Instant
1722

1823
@Service
1924
class TransactionService(
2025
private val transactionRepository: TransactionRepository,
2126
private val userRepository: UserRepository,
22-
private val transactionHistoryService: TransactionHistoryService,
27+
private val outboxRepository: TransferEventsOutboxRepository,
28+
private val outboxEventMapper: OutboxEventMapper,
2329
private val idGenerator: IdGenerator,
2430
private val eventPublisher: ApplicationEventPublisher,
2531
) : TransactionRegister {
2632

33+
private val log = LoggerFactory.getLogger(javaClass)
34+
2735
@Transactional
2836
override fun createTransfer(command: TransferRequestCommand): TransferResponseCommand {
29-
val sender = userRepository.findById(command.senderId) ?: throw DomainException(
30-
CommonError.NotFound("account_balance", command.senderId.toString()),
31-
"송신자 정보를 찾을 수 없습니다. senderId=${command.senderId}"
32-
)
33-
34-
val receiver = userRepository.findByAccountNumber(command.receiverAccountNumber) ?: throw DomainException(
35-
CommonError.NotFound("account_balance", command.receiverAccountNumber.toString()),
36-
"수신자 계좌 정보를 찾을 수 없습니다. snowFlakeId=${command.receiverAccountNumber}"
37-
)
37+
val (sender, receiver) = loadUsers(command)
38+
val amount = command.amount()
3839

39-
// TODO: - 테스트의 용이성과 확장성 / 재사용성 검증하기
40-
TransferValidator.validate(sender, receiver, command.amount())
40+
TransferValidator.validate(sender, receiver, amount)
4141

4242
val transaction = Transaction.of(
4343
SnowFlakeId(idGenerator.nextId()),
4444
sender.id,
4545
receiver.id,
46-
command.amount()
46+
amount
4747
)
4848

49-
sender.accountBalance.withdrawOrThrow(command.amount())
50-
receiver.accountBalance.deposit(command.amount())
51-
49+
sender.accountBalance.withdrawOrThrow(amount)
50+
receiver.accountBalance.deposit(amount)
5251
userRepository.save(sender)
5352
userRepository.save(receiver)
5453

55-
val completeEvent = transaction.complete()
5654
val savedTransaction = transactionRepository.save(transaction)
5755

58-
// TODO: outbox ( kafka publish ) + relay 서버를 fadeout 하고, CDC 방식으로 전환.
56+
val completeEvent = transaction.complete()
57+
58+
// outbox 먼저 저장
59+
saveToOutbox(completeEvent, savedTransaction.id.value)
60+
61+
// 이벤트 발행 (커밋 후 별도 스레드에서 Kafka 전송)
5962
eventPublisher.publishEvent(completeEvent)
6063

6164
return TransferResponseCommand.from(savedTransaction)
6265
}
6366

67+
private fun saveToOutbox(event: TransferCompleted, transactionId: Long) {
68+
try {
69+
val outboxEvent = outboxEventMapper.toOutboxEvent(event, transactionId)
70+
outboxRepository.save(outboxEvent, Instant.now())
71+
} catch (e: Exception) {
72+
throw DomainException(
73+
CommonError.Conflict("outbox_save_failed"),
74+
"송금 처리 중 시스템 오류가 발생했습니다.",
75+
e
76+
)
77+
}
78+
}
79+
80+
private fun loadUsers(command: TransferRequestCommand) =
81+
Pair(
82+
userRepository.findById(command.senderId)
83+
?: throw DomainException(
84+
CommonError.NotFound("account_balance", command.senderId.toString()),
85+
"송신자 정보를 찾을 수 없습니다."
86+
),
87+
userRepository.findByAccountNumber(command.receiverAccountNumber)
88+
?: throw DomainException(
89+
CommonError.NotFound("account_balance", command.receiverAccountNumber),
90+
"수신자 계좌 정보를 찾을 수 없습니다."
91+
)
92+
)
93+
94+
@Transactional(readOnly = true)
6495
override fun findTransfer(transactionId: Long): TransferResponseCommand {
6596
val tx = transactionRepository.findById(transactionId)
6697
?: throw DomainException(
@@ -69,5 +100,4 @@ class TransactionService(
69100
)
70101
return TransferResponseCommand.from(tx)
71102
}
72-
73-
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.hyungkishin.transentia.application.handler
22

3-
import io.github.hyungkishin.transentia.application.required.HybridFdsEventPublisher
3+
import io.github.hyungkishin.transentia.application.port.TransferEventPublisher
44
import io.github.hyungkishin.transentia.common.message.transfer.TransferCompleted
55
import org.slf4j.LoggerFactory
66
import org.springframework.scheduling.annotation.Async
@@ -10,31 +10,16 @@ import org.springframework.transaction.event.TransactionalEventListener
1010

1111
@Component
1212
class TransferOutboxEventHandler(
13-
private val hybridFdsEventPublisher: HybridFdsEventPublisher
13+
private val eventPublisher: TransferEventPublisher
1414
) {
15-
1615
private val log = LoggerFactory.getLogger(javaClass)
1716

1817
@Async("outboxEventExecutor")
1918
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
2019
fun handle(event: TransferCompleted) {
20+
log.debug("비동기 Kafka 전송 시도: transactionId={}, eventId={}", event.transactionId, event.eventId)
2121

22-
val currentThread = Thread.currentThread()
23-
24-
val threadConfigData = mapOf(
25-
"threadName" to currentThread.name,
26-
"threadGroup" to (currentThread.threadGroup?.name ?: "N/A"),
27-
"threadId" to currentThread.id.toString(),
28-
"isDaemon" to currentThread.isDaemon.toString()
29-
)
30-
31-
println("threadConfigData: $threadConfigData")
32-
33-
val kafkaSuccess = hybridFdsEventPublisher.publish(event)
34-
35-
if (!kafkaSuccess) {
36-
log.warn("Kafka 즉시 전송 실패, Outbox에 저장됨: transactionId={}", event.transactionId)
37-
}
22+
eventPublisher.publish(event)
3823
}
3924

40-
}
25+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.github.hyungkishin.transentia.application.mapper
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper
4+
import io.github.hyungkishin.transentia.common.message.transfer.TransferCompleted
5+
import io.github.hyungkishin.transentia.container.event.TransferEvent
6+
import org.slf4j.MDC
7+
import org.springframework.stereotype.Component
8+
import java.util.*
9+
10+
@Component
11+
class OutboxEventMapper(
12+
private val objectMapper: ObjectMapper
13+
) {
14+
fun toOutboxEvent(event: TransferCompleted, transactionId: Long): TransferEvent {
15+
return TransferEvent(
16+
eventId = event.eventId,
17+
aggregateType = "Transaction",
18+
aggregateId = transactionId.toString(),
19+
eventType = "TRANSFER_COMPLETED",
20+
payload = objectMapper.writeValueAsString(
21+
mapOf(
22+
"transactionId" to event.transactionId,
23+
"senderId" to event.senderUserId,
24+
"receiverId" to event.receiverUserId,
25+
"amount" to event.amount,
26+
"status" to "COMPLETED",
27+
"occurredAt" to event.occurredAt.toEpochMilli()
28+
)
29+
),
30+
headers = objectMapper.writeValueAsString(
31+
mapOf(
32+
"eventType" to "TRANSFER_COMPLETED",
33+
"eventVersion" to "v1",
34+
"traceId" to (MDC.get("traceId") ?: UUID.randomUUID().toString()),
35+
"producer" to "transfer-api",
36+
"contentType" to "application/json"
37+
)
38+
)
39+
)
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.github.hyungkishin.transentia.application.port
2+
3+
import io.github.hyungkishin.transentia.common.message.transfer.TransferCompleted
4+
5+
/**
6+
* 송금 이벤트 발행 Port
7+
*/
8+
interface TransferEventPublisher {
9+
/**
10+
* 송금 완료 이벤트 발행
11+
*
12+
* 호출자가 이미 비동기 스레드에서 실행 중
13+
*/
14+
fun publish(event: TransferCompleted)
15+
}

services/transfer/application/src/main/kotlin/io/github/hyungkishin/transentia/application/required/TransferEventsOutboxRepository.kt

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,18 @@ interface TransferEventsOutboxRepository {
99
fun save(row: TransferEvent, now: Instant)
1010

1111
/**
12-
* 처리할 이벤트들을 배치로 조회하고 SENDING 상태로 변경한다.
12+
* 처리 대기 중인 이벤트를 조회하고 SENDING 상태로 변경
1313
*
14-
* 여러 스레드나 프로세스가 동시에 실행되어도 안전하도록 SKIP LOCKED를 사용한다.
15-
* Stuck SENDING 상태(stuckThresholdSeconds 이상 진행 중)인 이벤트도 자동으로 복구하여 처리한다.
16-
* 우선순위는 PENDING > SENDING(Stuck) > FAILED 순으로 처리한다.
17-
*
18-
* @param limit 한 번에 처리할 최대 이벤트 수
19-
* @param now 기준 시간 (기본값: 현재 시간, 테스트 시 고정 시간 주입 가능)
20-
* @param stuckThresholdSeconds Stuck SENDING 판단 기준 시간 (초)
21-
* @return 처리할 이벤트 목록
14+
* SKIP LOCKED로 동시성 제어
15+
* 우선순위: PENDING > SENDING(Stuck) > FAILED
2216
*/
2317
fun claimBatch(
2418
limit: Int,
2519
now: Instant,
26-
stuckThresholdSeconds: Long = 600
20+
sendingTimeoutSeconds: Long = 120
2721
): List<ClaimedRow>
2822

29-
fun markAsPublished(
30-
ids: List<Long>,
31-
now: Instant,
32-
)
33-
34-
fun markFailedWithBackoff(
35-
id: Long,
36-
cause: String?,
37-
backoffMillis: Long,
38-
now: Instant,
39-
)
23+
fun markAsPublished(ids: List<Long>, now: Instant)
4024

25+
fun markFailedWithBackoff(id: Long, cause: String?, backoffMillis: Long, now: Instant)
4126
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.github.hyungkishin.transentia.infra.adapter
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper
4+
import io.github.hyungkishin.transentia.application.port.TransferEventPublisher
5+
import io.github.hyungkishin.transentia.application.required.TransferEventsOutboxRepository
6+
import io.github.hyungkishin.transentia.common.message.transfer.TransferCompleted
7+
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
8+
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventType
9+
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferStatus
10+
import io.github.hyungkishin.transentia.infrastructure.kafka.producer.service.KafkaProducer
11+
import org.slf4j.LoggerFactory
12+
import org.slf4j.MDC
13+
import org.springframework.beans.factory.annotation.Value
14+
import org.springframework.stereotype.Component
15+
import java.time.Instant
16+
import java.util.*
17+
18+
@Component
19+
class KafkaTransferEventPublisher(
20+
private val kafkaProducer: KafkaProducer<String, TransferEventAvroModel>,
21+
private val outboxRepository: TransferEventsOutboxRepository,
22+
private val objectMapper: ObjectMapper,
23+
@Value("\${app.kafka.topics.transfer-events}") private val topicName: String
24+
) : TransferEventPublisher {
25+
26+
private val log = LoggerFactory.getLogger(javaClass)
27+
28+
override fun publish(event: TransferCompleted) {
29+
try {
30+
val avroModel = TransferEventAvroModel.newBuilder()
31+
.setEventId(event.eventId)
32+
.setEventType(TransferEventType.TRANSFER_COMPLETED)
33+
.setAggregateId(event.transactionId.toString())
34+
.setTransactionId(event.transactionId)
35+
.setSenderId(event.senderUserId)
36+
.setReceiverId(event.receiverUserId)
37+
.setAmount(event.amount.toString())
38+
.setStatus(TransferStatus.COMPLETED)
39+
.setOccurredAt(event.occurredAt.toEpochMilli())
40+
.setHeaders(
41+
objectMapper.writeValueAsString(
42+
mapOf(
43+
"eventType" to "TRANSFER_COMPLETED",
44+
"eventVersion" to "v1",
45+
"traceId" to (MDC.get("traceId") ?: UUID.randomUUID().toString()),
46+
"producer" to "transfer-api",
47+
"contentType" to "application/json"
48+
)
49+
)
50+
)
51+
.setCreatedAt(System.currentTimeMillis())
52+
.build()
53+
54+
kafkaProducer.sendSync(topicName, avroModel)
55+
56+
outboxRepository.markAsPublished(listOf(event.eventId), Instant.now())
57+
log.debug("Kafka 전송 및 outbox PUBLISHED 완료: eventId={}", event.eventId)
58+
59+
} catch (e: Exception) {
60+
log.warn("Kafka 전송 실패 (relay 재시도): eventId={}, error={}", event.eventId, e.message)
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)