Skip to content

Commit 1ba6a6c

Browse files
committed
faet: single instance multiThread 전략의 relay-server 를 spring batch 로 이관
1 parent 82eedde commit 1ba6a6c

File tree

37 files changed

+1053
-726
lines changed

37 files changed

+1053
-726
lines changed

common/common-domain/src/main/kotlin/io/github/hyungkishin/transentia/common/outbox/transfer/ClaimedRow.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.github.hyungkishin.transentia.common.outbox.transfer
22

33
data class ClaimedRow(
44
val eventId: Long,
5-
val aggregateId: String,
65
val payload: String,
76
val headers: String,
87
val attemptCount: Int = 0

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ services:
8282
command: >
8383
bash -c "
8484
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-complete-events --partitions 8 --replication-factor 1 &&
85-
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 3 --replication-factor 1 &&
86-
echo 'topics created'
85+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 8 --replication-factor 1 &&
86+
echo 'Topics created: transfer-complete-events (8 partitions), transfer-transaction-events (8 partitions) - for 2000 TPS target'
8787
"
8888
restart: "no"
8989

docs/etc/송금도메인 이벤트 정리.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ FDS(이상거래탐지) 쪽에서 반드시 필요한 이벤트 이다.
5151
- 이후 발행 여부는 published_at으로만 구분
5252

5353
## Outbox에 저장되는 필드
54-
- `event_id` : Snowflake 기반 ID
54+
- `event_id` : Snowflake 기반 ID ( Transaction ID )
5555
- `aggregate_type` : "Transfer" (어떤 Aggregate의 이벤트인지)
56-
- `aggregate_id` : Transaction ID
5756
- `event_type` : "TransferRequested", "TransferCompleted", "TransferFailed"
5857
- `payload` : 위 JSON 직렬화 결과
5958
- `headers` : traceId, correlationId 등

infrastructure/kafka/kafka-consumer/src/main/kotlin/io/github/hyungkishin/transentia/infrastructure/kafka/consumer/config/KafkaConsumerConfig.kt

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory
1111
import org.springframework.kafka.core.ConsumerFactory
1212
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
1313
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
14+
import org.springframework.kafka.listener.ContainerProperties
1415
import java.io.Serializable
1516

1617
@Configuration
@@ -22,21 +23,50 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
2223
@Bean
2324
fun consumerConfigs(): Map<String, Any> {
2425
return mutableMapOf<String, Any>().apply {
26+
// 기본 설정
2527
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.bootstrapServers)
2628
put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfigData.consumerGroupId)
2729
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.keyDeserializer)
2830
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.valueDeserializer)
2931
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.autoOffsetReset)
32+
33+
// Avro 설정
3034
put(kafkaConfigData.schemaRegistryUrlKey, kafkaConfigData.schemaRegistryUrl)
3135
put(kafkaConsumerConfigData.specificAvroReaderKey, kafkaConsumerConfigData.specificAvroReader)
36+
37+
// Consumer Group 관리
3238
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.sessionTimeoutMs)
3339
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.heartbeatIntervalMs)
3440
put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.maxPollIntervalMs)
35-
put(
36-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
37-
kafkaConsumerConfigData.maxPartitionFetchBytesDefault * kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor
41+
42+
// Fetch 설정
43+
put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
44+
kafkaConsumerConfigData.maxPartitionFetchBytesDefault *
45+
kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor
3846
)
3947
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.maxPollRecords)
48+
49+
// Fetch 최소 바이트: 1KB
50+
// - 브로커가 최소 이 크기만큼 데이터가 쌓일 때까지 대기
51+
// - 너무 작으면 네트워크 오버헤드, 너무 크면 지연 발생
52+
put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024)
53+
54+
// Fetch 최대 대기 시간: 500ms
55+
// - fetch.min.bytes에 도달하지 않아도 이 시간 후 응답
56+
// - 실시간성과 처리량의 균형
57+
put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500)
58+
59+
// 자동 커밋 비활성화 (수동 제어)
60+
// - Spring Kafka의 AckMode로 제어
61+
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
62+
63+
// Isolation Level: read_committed
64+
// - 트랜잭션 커밋된 메시지만 읽음
65+
// - 데이터 정합성 보장
66+
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
67+
68+
// Client ID (모니터링용)
69+
put(ConsumerConfig.CLIENT_ID_CONFIG, "fds-consumer-\${spring.application.name}")
4070
}
4171
}
4272

@@ -45,14 +75,35 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
4575
return DefaultKafkaConsumerFactory(consumerConfigs())
4676
}
4777

78+
/**
79+
* 단일 이벤트 처리용 Kafka Listener Container Factory
80+
*
81+
* - Batch Listener: false (단일 이벤트)
82+
* - Concurrency: 8 (파티션당 1 스레드)
83+
* - AckMode: MANUAL_IMMEDIATE (수동 커밋, 즉시)
84+
*/
4885
@Bean
4986
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>> {
5087
val factory = ConcurrentKafkaListenerContainerFactory<K, V>()
88+
5189
factory.consumerFactory = consumerFactory()
90+
91+
// 단일 이벤트 처리
5292
factory.isBatchListener = kafkaConsumerConfigData.batchListener
93+
94+
// Concurrency 설정 (파티션 수와 동일하게)
5395
factory.setConcurrency(kafkaConsumerConfigData.concurrencyLevel)
96+
97+
// 자동 시작
5498
factory.setAutoStartup(kafkaConsumerConfigData.autoStartup)
55-
factory.containerProperties.pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
99+
100+
// Container Properties 설정
101+
factory.containerProperties.apply {
102+
pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
103+
ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
104+
}
105+
56106
return factory
57107
}
58-
}
108+
109+
}

infrastructure/kafka/kafka-model/src/main/resources/avro/transfer_event.avsc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
},
1919
"doc": "Type of transfer event"
2020
},
21-
{
22-
"name": "aggregateId",
23-
"type": "string",
24-
"doc": "Transaction aggregate ID as string"
25-
},
2621
{
2722
"name": "transactionId",
2823
"type": "long",

services/fds/application/src/main/kotlin/io/github/hyungkishin/transentia/application/service/AnalyzeTransferService.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ class AnalyzeTransferService(
2727
// TODO: 엣지케이스 -> 알림 + log 성 + 학습 + 관리자 !
2828
// 과연 은행사마다 만들었을까 ? 이상감지를 탐지해주는 패턴이 있을것이다.
2929

30+
// NOTE : Hive 류의 빅데이터 플랫폼 <- 데이터의 근거
31+
// 10년치 계좌의 모든 계좌 이력의 전체 -> 불특정 다수 -> 관계도를 -> queryBase 로 찾을 경우 ( 성능 up 비용이 높을때다. )
3032
// LAG + LLM
3133

3234
// 모든 활성화된 룰 조회

services/fds/infra/src/main/kotlin/io/github/hyungkishin/transentia/infra/event/TransferKafkaListener.kt

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,66 @@ package io.github.hyungkishin.transentia.infra.event
22

33
import io.github.hyungkishin.transentia.application.service.AnalyzeTransferService
44
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
5+
import org.apache.kafka.clients.consumer.ConsumerRecord
56
import org.slf4j.LoggerFactory
6-
import org.springframework.beans.factory.annotation.Value
77
import org.springframework.kafka.annotation.KafkaListener
8-
import org.springframework.messaging.handler.annotation.Headers
8+
import org.springframework.kafka.support.Acknowledgment
9+
import org.springframework.kafka.support.KafkaHeaders
10+
import org.springframework.messaging.handler.annotation.Header
911
import org.springframework.messaging.handler.annotation.Payload
1012
import org.springframework.stereotype.Component
1113

1214
@Component
1315
class TransferKafkaListener(
14-
@Value("\${app.transfer.topic}") private val transferTopic: String,
1516
private val analyzeTransferService: AnalyzeTransferService,
1617
private val transferEventMapper: TransferEventMapper,
1718
) {
1819
private val log = LoggerFactory.getLogger(javaClass)
1920

20-
/**
21-
* TODO: 이벤트를 처리하는 쪽의 성능
22-
* TODO: 메세지 중복처리 할때의 문제 ( 현재 너무 risk ) -> 방어책
23-
* TODO: offset update 여부 ( Big data tools 로 확인 )
24-
*
25-
* - 보내는 쪽과 받는쪽의 쓰루풋을 어떻게 조율 할 것인지
26-
* - producer 몇대 , consumer 몇대 , 파티션 몇개
27-
*/
2821
@KafkaListener(
2922
id = "\${kafka-consumer-config.consumer-group-id}",
3023
topics = ["\${app.transfer.topic}"],
24+
containerFactory = "kafkaListenerContainerFactory"
3125
)
3226
fun receive(
33-
@Payload messages: List<TransferEventAvroModel>,
34-
@Headers headers: Map<String, Any>
27+
@Payload message: TransferEventAvroModel,
28+
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
29+
@Header(KafkaHeaders.OFFSET) offset: Long,
30+
@Header(value = "eventType", required = false) eventType: String?,
31+
@Header(value = "X-Trace-Id", required = false) traceId: String?,
32+
consumerRecord: ConsumerRecord<String, TransferEventAvroModel>,
33+
acknowledgment: Acknowledgment?
3534
) {
36-
val eventType = headers["eventType"]?.toString()
37-
val traceId = headers["X-Trace-Id"]?.toString()
35+
try {
36+
log.debug(
37+
"[FDS-Consumer] Received - partition={} offset={} eventId={} traceId={}",
38+
partition, offset, message.eventId, traceId
39+
)
3840

39-
log.info("@@@@@[FDS-Consumer] RECEIVED {} messages, traceId={}", messages.size, traceId)
41+
// Domain Event 변환
42+
val domainEvent = transferEventMapper.toDomain(message)
4043

41-
// TODO : offset 동작 확인
42-
messages.forEach { avroMessage ->
43-
try {
44-
log.info(
45-
"@@@@@[FDS-Consumer] Processing eventId={} amount={} status={}",
46-
avroMessage.eventId, avroMessage.amount, avroMessage.status
47-
)
44+
// FDS 분석 실행
45+
val riskLog = analyzeTransferService.analyze(domainEvent)
4846

49-
val domainEvent = transferEventMapper.toDomain(avroMessage)
47+
log.info(
48+
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
49+
domainEvent.eventId,
50+
riskLog.decision,
51+
riskLog.ruleHits.size,
52+
)
5053

51-
val riskLog = analyzeTransferService.analyze(domainEvent)
54+
// 수동 커밋 (MANUAL_IMMEDIATE 모드인 경우)
55+
acknowledgment?.acknowledge()
5256

53-
log.info(
54-
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
55-
domainEvent.eventId, riskLog.decision, riskLog.ruleHits.size
56-
)
57-
// TODO: Thread.sleep 을 걸었을때의 문제 발생 -> 여러 인스턴스 에서 책정하는것이 명확.
58-
// TODO: Docker -> 인스턴스 3 대 -> log 확인
59-
60-
} catch (e: Exception) {
61-
// TODO: 예외 발생시, 카프카 장애 대응 확인
62-
// TODO: 카프카 쪽의 영향도 확인
63-
log.error("[FDS-Consumer] Analysis failed - eventId={}", avroMessage.eventId, e)
64-
// 재처리를 위해 예외 전파
65-
throw e
66-
}
57+
} catch (e: Exception) {
58+
log.error(
59+
"[FDS-Consumer] Analysis failed - partition={} offset={} eventId={} error={}",
60+
partition, offset, message.eventId, e.message, e
61+
)
62+
// 예외 발생시 재처리를 위해 전파
63+
throw e
6764
}
6865
}
66+
6967
}

services/fds/instances/api/src/main/resources/application.yml

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,25 @@ spring:
99
username: postgres
1010
password: pass1234
1111
driver-class-name: org.postgresql.Driver
12+
# HikariCP 최적화
13+
hikari:
14+
maximum-pool-size: 20
15+
minimum-idle: 10
16+
connection-timeout: 30000
17+
idle-timeout: 600000
18+
max-lifetime: 1800000
1219

1320
jpa:
1421
open-in-view: false
1522
hibernate:
1623
ddl-auto: none
17-
show-sql: true
24+
show-sql: false
25+
properties:
26+
hibernate:
27+
jdbc:
28+
batch_size: 20
29+
order_inserts: true
30+
order_updates: true
1831

1932
flyway:
2033
enabled: false
@@ -23,35 +36,35 @@ spring:
2336

2437
logging:
2538
level:
26-
org.hibernate.SQL: DEBUG
27-
org.hibernate.type.descriptor.sql.BasicBinder: TRACE
39+
org.hibernate.SQL: INFO
40+
org.hibernate.type.descriptor.sql.BasicBinder: INFO
2841
org.springframework.kafka: INFO
29-
io.github.hyungkishin.transentia: DEBUG
42+
io.github.hyungkishin.transentia: INFO
3043

3144
kafka-config:
3245
bootstrap-servers: host.docker.internal:9094
3346
schema-registry-url-key: schema.registry.url
3447
schema-registry-url: http://localhost:8085
35-
num-of-partitions: 8
48+
num-of-partitions: 3
3649
replication-factor: 1
37-
3850
kafka-consumer-config:
3951
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
4052
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
4153
consumer-group-id: fds-consumer-group
4254
auto-offset-reset: earliest
4355
specific-avro-reader-key: specific.avro.reader
4456
specific-avro-reader: true
45-
batch-listener: true
57+
58+
batch-listener: false
4659
auto-startup: true
47-
concurrency-level: 2 # 2000 TPS 는 4
60+
concurrency-level: 3
61+
max-poll-records: 100
62+
max-partition-fetch-bytes-default: 1048576
63+
max-partition-fetch-bytes-boost-factor: 1
64+
poll-timeout-ms: 1000
4865
session-timeout-ms: 10000
4966
heartbeat-interval-ms: 3000
5067
max-poll-interval-ms: 300000
51-
max-poll-records: 500
52-
max-partition-fetch-bytes-default: 1048576
53-
max-partition-fetch-bytes-boost-factor: 1
54-
poll-timeout-ms: 500
5568

5669
app:
5770
transfer:

0 commit comments

Comments
 (0)