Skip to content

Commit 1d0a998

Browse files
committed
fix: spring batch 컨셉의 multiThread 반영
1 parent 8b8fcac commit 1d0a998

File tree

11 files changed

+319
-267
lines changed

11 files changed

+319
-267
lines changed

services/fds/application/src/main/kotlin/io/github/hyungkishin/transentia/application/service/AnalyzeTransferService.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ class AnalyzeTransferService(
2727
// TODO: 엣지케이스 -> 알림 + log 성 + 학습 + 관리자 !
2828
// 과연 은행사마다 만들었을까 ? 이상감지를 탐지해주는 패턴이 있을것이다.
2929

30+
// NOTE : Hive 류의 빅데이터 플랫폼 <- 데이터의 근거
31+
// 10년치 계좌의 모든 계좌 이력의 전체 -> 불특정 다수 -> 관계도를 -> queryBase 로 찾을 경우 ( 성능 up 비용이 높을때다. )
3032
// LAG + LLM
3133

3234
// 모든 활성화된 룰 조회

services/fds/infra/src/main/kotlin/io/github/hyungkishin/transentia/infra/event/TransferKafkaListener.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,14 @@ class TransferKafkaListener(
4646
avroMessage.eventId, avroMessage.amount, avroMessage.status
4747
)
4848

49+
// threadPool 배경 -> 현대
50+
// GC -> 자바에서 / redis | <- CS
51+
52+
// batch -> SCDF
53+
4954
val domainEvent = transferEventMapper.toDomain(avroMessage)
5055

56+
// offset commit 을 할 수 있나 ?
5157
val riskLog = analyzeTransferService.analyze(domainEvent)
5258

5359
log.info(

services/transfer/application/src/main/kotlin/io/github/hyungkishin/transentia/application/TransactionService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TransactionService(
5757
saveToOutbox(completeEvent)
5858

5959
// 이벤트 발행 (커밋 후 별도 스레드에서 Kafka 전송) - @see TransferOutboxEventHandler
60-
// eventPublisher.publishEvent(completeEvent)
60+
eventPublisher.publishEvent(completeEvent)
6161

6262
return TransferResponseCommand.from(savedTransaction)
6363
}

services/transfer/application/src/main/kotlin/io/github/hyungkishin/transentia/application/required/TransferEventsOutboxRepository.kt

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,59 @@ import io.github.hyungkishin.transentia.common.outbox.transfer.ClaimedRow
44
import io.github.hyungkishin.transentia.container.event.TransferEvent
55
import java.time.Instant
66

7+
/**
8+
* Transfer Events Outbox Repository
9+
*
10+
* 단순 구조:
11+
* 1. save: Outbox INSERT
12+
* 2. claimBatch: PENDING/SENDING(stuck) → SENDING (watchdog 설정)
13+
* 3. markAsPublished: SENDING → PUBLISHED
14+
* 4. markForRetry: SENDING → PENDING (재시도)
15+
* 5. markAsDeadLetter: * → DEAD_LETTER (최종)
16+
*/
717
interface TransferEventsOutboxRepository {
818

919
fun save(row: TransferEvent, now: Instant)
1020

1121
/**
12-
* 처리 대기 중인 이벤트를 조회하고 SENDING 상태로 변경
13-
*
14-
* SKIP LOCKED로 동시성 제어
15-
* 우선순위: PENDING > SENDING(Stuck) > FAILED
22+
* 처리 대기 중인 이벤트 조회 및 claim
23+
*
24+
* - PENDING → SENDING (attempt + 1, watchdog 설정)
25+
* - SENDING(stuck) → SENDING (attempt 유지, watchdog 재설정)
26+
* - watchdog: next_retry_at = now + sendingTimeoutSeconds
27+
*
28+
* @param limit 조회 건수
29+
* @param now 현재 시각
30+
* @param sendingTimeoutSeconds SENDING 타임아웃 (초)
1631
*/
1732
fun claimBatch(
1833
limit: Int,
1934
now: Instant,
2035
sendingTimeoutSeconds: Long = 120
2136
): List<ClaimedRow>
2237

38+
/**
39+
* Kafka 발행 성공
40+
*/
2341
fun markAsPublished(ids: List<Long>, now: Instant)
2442

25-
fun markFailedWithBackoff(id: Long, cause: String?, backoffMillis: Long, now: Instant)
43+
/**
44+
* 재시도 예약
45+
*
46+
* SENDING → PENDING
47+
*/
48+
fun markForRetry(
49+
eventId: Long,
50+
attemptCount: Int,
51+
nextRetryAt: Instant,
52+
error: String?,
53+
now: Instant
54+
)
55+
56+
/**
57+
* DEAD_LETTER 전환
58+
*
59+
* maxAttempts 초과 시
60+
*/
61+
fun markAsDeadLetter(eventId: Long, error: String?, now: Instant)
2662
}

services/transfer/infra/src/main/kotlin/io/github/hyungkishin/transentia/infra/rdb/adapter/TransferEventsOutboxJdbcRepository.kt

Lines changed: 53 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ class TransferEventsOutboxJdbcRepository(
2828
'PENDING', 0,
2929
:now, :now, :now
3030
)
31-
ON CONFLICT (event_id) DO NOTHING
31+
ON CONFLICT (event_id) DO UPDATE
32+
SET status = EXCLUDED.status,
33+
updated_at = EXCLUDED.updated_at
3234
""".trimIndent()
3335

3436
jdbc.update(
@@ -44,63 +46,42 @@ class TransferEventsOutboxJdbcRepository(
4446
}
4547

4648
/**
47-
* 처리 대기 중인 이벤트를 조회하고 SENDING 상태로 변경
48-
*
49-
* SKIP LOCKED로 동시성 제어
50-
* 우선순위: PENDING > SENDING(Stuck)
49+
* Phase 1: 간단한 Claim (PENDING만 조회)
50+
*
51+
* - FOR UPDATE SKIP LOCKED로 동시성 제어
52+
* - SENDING 상태 없이 PENDING만 사용
53+
* - 조회 후 바로 처리
5154
*/
5255
override fun claimBatch(
5356
limit: Int,
5457
now: Instant,
5558
sendingTimeoutSeconds: Long
5659
): List<ClaimedRow> {
57-
val stuckThreshold = Timestamp.from(now.minusSeconds(sendingTimeoutSeconds))
58-
val gracePeriod = Timestamp.from(now.minusSeconds(30))
5960
val currentTime = Timestamp.from(now)
6061

6162
val sql = """
62-
WITH grabbed AS (
63-
SELECT event_id
63+
SELECT
64+
event_id,
65+
payload::text AS payload,
66+
headers::text AS headers,
67+
attempt_count
6468
FROM transfer_events
65-
WHERE (
66-
-- PENDING이면서 Main이 처리할 시간(30초) 지남
67-
(status = 'PENDING' AND created_at < :gracePeriod)
68-
-- 또는 SENDING인데 타임아웃 (Worker 죽은 경우)
69-
OR (status = 'SENDING' AND updated_at < :stuckThreshold)
70-
)
71-
AND attempt_count < 5
72-
ORDER BY created_at
69+
WHERE status = 'PENDING'
70+
AND (next_retry_at IS NULL OR next_retry_at <= :now)
71+
AND attempt_count < 5
72+
ORDER BY next_retry_at NULLS FIRST, created_at
7373
FOR UPDATE SKIP LOCKED
7474
LIMIT :limit
75-
)
76-
UPDATE transfer_events t
77-
SET status = 'SENDING',
78-
attempt_count = CASE
79-
WHEN t.status = 'SENDING' THEN t.attempt_count
80-
ELSE t.attempt_count + 1
81-
END,
82-
updated_at = :now
83-
FROM grabbed g
84-
WHERE t.event_id = g.event_id
85-
RETURNING t.event_id,
86-
t.payload::text AS payload,
87-
t.headers::text AS headers,
88-
t.attempt_count
89-
""".trimIndent()
75+
""".trimIndent()
9076

9177
return jdbc.query(
9278
sql, mapOf(
9379
"limit" to limit,
94-
"now" to currentTime,
95-
"stuckThreshold" to stuckThreshold,
96-
"gracePeriod" to gracePeriod
80+
"now" to currentTime
9781
), claimedRowMapper
9882
)
9983
}
10084

101-
/**
102-
* Kafka 발행 성공한 이벤트를 PUBLISHED로 변경
103-
*/
10485
override fun markAsPublished(
10586
ids: List<Long>,
10687
now: Instant
@@ -115,6 +96,7 @@ class TransferEventsOutboxJdbcRepository(
11596
published_at = :now,
11697
updated_at = :now
11798
WHERE event_id IN (:ids)
99+
AND status = 'PENDING'
118100
""".trimIndent()
119101

120102
jdbc.update(
@@ -125,36 +107,51 @@ class TransferEventsOutboxJdbcRepository(
125107
)
126108
}
127109

128-
override fun markFailedWithBackoff(
129-
id: Long,
130-
cause: String?,
131-
backoffMillis: Long,
110+
override fun markForRetry(
111+
eventId: Long,
112+
attemptCount: Int,
113+
nextRetryAt: Instant,
114+
error: String?,
132115
now: Instant
133116
) {
134-
val currentTime = Timestamp.from(now)
135-
val nextRetry = Timestamp.from(now.plusMillis(backoffMillis))
136-
137117
val sql = """
138118
UPDATE transfer_events
139-
SET status = CASE
140-
WHEN attempt_count >= 5 THEN 'DEAD_LETTER'::transfer_outbox_status
141-
ELSE 'SENDING'::transfer_outbox_status
142-
END,
143-
error_message = :errorMessage,
144-
updated_at = :now,
145-
next_retry_at = :nextRetry
119+
SET status = 'PENDING',
120+
attempt_count = :attemptCount,
121+
next_retry_at = :nextRetryAt,
122+
error_message = :error,
123+
updated_at = :now
146124
WHERE event_id = :eventId
125+
AND status = 'PENDING'
147126
""".trimIndent()
148127

149128
jdbc.update(
150129
sql, mapOf(
151-
"eventId" to id,
152-
"errorMessage" to (cause ?: "UNKNOWN"),
153-
"now" to currentTime,
154-
"nextRetry" to nextRetry
130+
"eventId" to eventId,
131+
"attemptCount" to attemptCount,
132+
"nextRetryAt" to Timestamp.from(nextRetryAt),
133+
"error" to error,
134+
"now" to Timestamp.from(now)
155135
)
156136
)
137+
}
157138

139+
override fun markAsDeadLetter(eventId: Long, error: String?, now: Instant) {
140+
val sql = """
141+
UPDATE transfer_events
142+
SET status = 'DEAD_LETTER',
143+
error_message = :error,
144+
updated_at = :now
145+
WHERE event_id = :eventId
146+
""".trimIndent()
147+
148+
jdbc.update(
149+
sql, mapOf(
150+
"eventId" to eventId,
151+
"error" to error,
152+
"now" to Timestamp.from(now)
153+
)
154+
)
158155
}
159156

160157
private val claimedRowMapper = RowMapper { rs, _ ->

0 commit comments

Comments
 (0)