Skip to content

Commit 861cefc

Browse files
committed
faet: single instance multiThread 전략의 relay-server 를 spring batch 로 이관
1 parent 1d0a998 commit 861cefc

File tree

29 files changed

+1115
-531
lines changed

29 files changed

+1115
-531
lines changed

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ services:
8282
command: >
8383
bash -c "
8484
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-complete-events --partitions 8 --replication-factor 1 &&
85-
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 3 --replication-factor 1 &&
86-
echo 'topics created'
85+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 8 --replication-factor 1 &&
86+
echo 'Topics created: transfer-complete-events (8 partitions), transfer-transaction-events (8 partitions) - for 2000 TPS target'
8787
"
8888
restart: "no"
8989

docs/etc/test.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# 목표
2+
-
3+
4+
## kafka Streams
5+
단일이벤트로 -> 어딘가에서 모아서 (그루핑) -> 이벤트
6+
7+
8+
from (10) -> to (1)
9+
> - 이벤트에 순서를 보장하는 기법
10+
11+
## 이벤트 저마다의 성격
12+
- 필터 가 되는 "것"
13+
- 정상적이지 않으면 다음으로 넘기지 않는것.
14+
15+
## 이벤트 레이어
16+
- 확인 필요
17+
18+
## 하둡
19+
- 확인 필요
20+
-
21+
22+
## 제 2의 묶어서 처리
23+
- https://tv.naver.com/v/34028236
24+
25+
## spring cloude streams
26+
-
27+
28+
## 이벤트 발행의 숙련도
29+
30+
## 결론
31+
> - 한번의 이벤트에서 모든 비즈니스 로직을 처리하려다 보니, 이벤트에서 처리해야 될 역할이 모호해졌다.
32+
> - 가령 List 로 받아야 되지 않을까 -> 500 건까지 받으면 501번은 짤리게 되고..
33+
> - 그래서 하나의 이벤트를 받아서 처리하는 역할을 <- 단일 이벤트로 받아서 처리하고
34+
> - 이벤트 들을 그룹화 해서 제 2 의 비즈니스로 풀어내는 방식인 cloude stream 이라는 기법이 필요할것 같다.

infrastructure/kafka/kafka-consumer/src/main/kotlin/io/github/hyungkishin/transentia/infrastructure/kafka/consumer/config/KafkaConsumerConfig.kt

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory
1111
import org.springframework.kafka.core.ConsumerFactory
1212
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
1313
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
14+
import org.springframework.kafka.listener.ContainerProperties
1415
import java.io.Serializable
1516

1617
@Configuration
@@ -19,24 +20,65 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
1920
private val kafkaConsumerConfigData: KafkaConsumerConfigData
2021
) {
2122

23+
/**
24+
* 2000 TPS를 위한 Consumer 설정
25+
*
26+
* 핵심 설정:
27+
* - fetch.min.bytes: 최소 fetch 바이트 (1KB)
28+
* - fetch.max.wait.ms: 최대 대기 시간 (500ms)
29+
* - enable.auto.commit: 자동 커밋 비활성화 (수동 제어)
30+
*/
2231
@Bean
2332
fun consumerConfigs(): Map<String, Any> {
2433
return mutableMapOf<String, Any>().apply {
34+
// 기본 설정
2535
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.bootstrapServers)
2636
put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfigData.consumerGroupId)
2737
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.keyDeserializer)
2838
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.valueDeserializer)
2939
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.autoOffsetReset)
40+
41+
// Avro 설정
3042
put(kafkaConfigData.schemaRegistryUrlKey, kafkaConfigData.schemaRegistryUrl)
3143
put(kafkaConsumerConfigData.specificAvroReaderKey, kafkaConsumerConfigData.specificAvroReader)
44+
45+
// Consumer Group 관리
3246
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.sessionTimeoutMs)
3347
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.heartbeatIntervalMs)
3448
put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.maxPollIntervalMs)
35-
put(
36-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
37-
kafkaConsumerConfigData.maxPartitionFetchBytesDefault * kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor
49+
50+
// Fetch 설정
51+
put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
52+
kafkaConsumerConfigData.maxPartitionFetchBytesDefault *
53+
kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor
3854
)
3955
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.maxPollRecords)
56+
57+
// ========================================
58+
// 추가 성능 최적화 설정
59+
// ========================================
60+
61+
// Fetch 최소 바이트: 1KB
62+
// - 브로커가 최소 이 크기만큼 데이터가 쌓일 때까지 대기
63+
// - 너무 작으면 네트워크 오버헤드, 너무 크면 지연 발생
64+
put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024)
65+
66+
// Fetch 최대 대기 시간: 500ms
67+
// - fetch.min.bytes에 도달하지 않아도 이 시간 후 응답
68+
// - 실시간성과 처리량의 균형
69+
put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500)
70+
71+
// 자동 커밋 비활성화 (수동 제어)
72+
// - Spring Kafka의 AckMode로 제어
73+
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
74+
75+
// Isolation Level: read_committed
76+
// - 트랜잭션 커밋된 메시지만 읽음
77+
// - 데이터 정합성 보장
78+
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
79+
80+
// Client ID (모니터링용)
81+
put(ConsumerConfig.CLIENT_ID_CONFIG, "fds-consumer-\${spring.application.name}")
4082
}
4183
}
4284

@@ -45,14 +87,64 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
4587
return DefaultKafkaConsumerFactory(consumerConfigs())
4688
}
4789

90+
/**
91+
* 단일 이벤트 처리용 Kafka Listener Container Factory
92+
*
93+
* 설정:
94+
* - Batch Listener: false (단일 이벤트)
95+
* - Concurrency: 8 (파티션당 1 스레드)
96+
* - AckMode: MANUAL_IMMEDIATE (수동 커밋, 즉시)
97+
*/
4898
@Bean
4999
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>> {
50100
val factory = ConcurrentKafkaListenerContainerFactory<K, V>()
101+
51102
factory.consumerFactory = consumerFactory()
103+
104+
// 단일 이벤트 처리
52105
factory.isBatchListener = kafkaConsumerConfigData.batchListener
106+
107+
// Concurrency 설정 (파티션 수와 동일하게)
53108
factory.setConcurrency(kafkaConsumerConfigData.concurrencyLevel)
109+
110+
// 자동 시작
54111
factory.setAutoStartup(kafkaConsumerConfigData.autoStartup)
55-
factory.containerProperties.pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
112+
113+
// Container Properties 설정
114+
factory.containerProperties.apply {
115+
pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
116+
117+
// AckMode: MANUAL_IMMEDIATE
118+
// - 메시지 처리 후 즉시 offset commit
119+
// - 빠른 커밋으로 재처리 최소화
120+
ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
121+
122+
// Error Handler (선택적)
123+
// - 에러 발생 시 재시도 또는 DLQ 전송
124+
// setCommonErrorHandler(...)
125+
}
126+
56127
return factory
57128
}
58-
}
129+
130+
/**
131+
* 배치 처리용 Kafka Listener Container Factory (선택적)
132+
* 현재는 사용하지 않음 - 단일 이벤트 처리 방식 사용
133+
*/
134+
// @Bean
135+
// fun batchKafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>> {
136+
// val factory = ConcurrentKafkaListenerContainerFactory<K, V>()
137+
//
138+
// factory.consumerFactory = consumerFactory()
139+
// factory.isBatchListener = true
140+
// factory.setConcurrency(kafkaConsumerConfigData.concurrencyLevel)
141+
// factory.setAutoStartup(false) // 수동 시작
142+
//
143+
// factory.containerProperties.apply {
144+
// pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
145+
// ackMode = ContainerProperties.AckMode.BATCH
146+
// }
147+
//
148+
// return factory
149+
// }
150+
}

services/fds/infra/src/main/kotlin/io/github/hyungkishin/transentia/infra/event/TransferKafkaListener.kt

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,74 +2,72 @@ package io.github.hyungkishin.transentia.infra.event
22

33
import io.github.hyungkishin.transentia.application.service.AnalyzeTransferService
44
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
5+
import org.apache.kafka.clients.consumer.ConsumerRecord
56
import org.slf4j.LoggerFactory
6-
import org.springframework.beans.factory.annotation.Value
77
import org.springframework.kafka.annotation.KafkaListener
8-
import org.springframework.messaging.handler.annotation.Headers
8+
import org.springframework.kafka.support.Acknowledgment
9+
import org.springframework.kafka.support.KafkaHeaders
10+
import org.springframework.messaging.handler.annotation.Header
911
import org.springframework.messaging.handler.annotation.Payload
1012
import org.springframework.stereotype.Component
1113

1214
@Component
1315
class TransferKafkaListener(
14-
@Value("\${app.transfer.topic}") private val transferTopic: String,
1516
private val analyzeTransferService: AnalyzeTransferService,
1617
private val transferEventMapper: TransferEventMapper,
1718
) {
1819
private val log = LoggerFactory.getLogger(javaClass)
1920

2021
/**
21-
* TODO: 이벤트를 처리하는 쪽의 성능
22-
* TODO: 메세지 중복처리 할때의 문제 ( 현재 너무 risk ) -> 방어책
23-
* TODO: offset update 여부 ( Big data tools 로 확인 )
24-
*
25-
* - 보내는 쪽과 받는쪽의 쓰루풋을 어떻게 조율 할 것인지
26-
* - producer 몇대 , consumer 몇대 , 파티션 몇개
22+
* 단일 이벤트 처리 방식으로 변경
23+
* - 2000 TPS 목표를 위한 최적화
24+
* - 파티션별 병렬 처리
25+
* - 빠른 오프셋 커밋
2726
*/
2827
@KafkaListener(
2928
id = "\${kafka-consumer-config.consumer-group-id}",
3029
topics = ["\${app.transfer.topic}"],
30+
containerFactory = "kafkaListenerContainerFactory"
3131
)
3232
fun receive(
33-
@Payload messages: List<TransferEventAvroModel>,
34-
@Headers headers: Map<String, Any>
33+
@Payload message: TransferEventAvroModel,
34+
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
35+
@Header(KafkaHeaders.OFFSET) offset: Long,
36+
@Header(value = "eventType", required = false) eventType: String?,
37+
@Header(value = "X-Trace-Id", required = false) traceId: String?,
38+
consumerRecord: ConsumerRecord<String, TransferEventAvroModel>,
39+
acknowledgment: Acknowledgment?
3540
) {
36-
val eventType = headers["eventType"]?.toString()
37-
val traceId = headers["X-Trace-Id"]?.toString()
41+
try {
42+
log.debug(
43+
"[FDS-Consumer] Received - partition={} offset={} eventId={} traceId={}",
44+
partition, offset, message.eventId, traceId
45+
)
3846

39-
log.info("@@@@@[FDS-Consumer] RECEIVED {} messages, traceId={}", messages.size, traceId)
47+
// Domain Event 변환
48+
val domainEvent = transferEventMapper.toDomain(message)
4049

41-
// TODO : offset 동작 확인
42-
messages.forEach { avroMessage ->
43-
try {
44-
log.info(
45-
"@@@@@[FDS-Consumer] Processing eventId={} amount={} status={}",
46-
avroMessage.eventId, avroMessage.amount, avroMessage.status
47-
)
50+
// FDS 분석 실행
51+
val riskLog = analyzeTransferService.analyze(domainEvent)
4852

49-
// threadPool 배경 -> 현대
50-
// GC -> 자바에서 / redis | <- CS
53+
log.info(
54+
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
55+
domainEvent.eventId,
56+
riskLog.decision,
57+
riskLog.ruleHits.size,
58+
)
5159

52-
// batch -> SCDF
60+
// 수동 커밋 (MANUAL_IMMEDIATE 모드인 경우)
61+
acknowledgment?.acknowledge()
5362

54-
val domainEvent = transferEventMapper.toDomain(avroMessage)
55-
56-
// offset commit 을 할 수 있나 ?
57-
val riskLog = analyzeTransferService.analyze(domainEvent)
58-
59-
log.info(
60-
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
61-
domainEvent.eventId, riskLog.decision, riskLog.ruleHits.size
62-
)
63-
// TODO: Thread.sleep 을 걸었을때의 문제 발생 -> 여러 인스턴스 에서 책정하는것이 명확.
64-
// TODO: Docker -> 인스턴스 3 대 -> log 확인
65-
66-
} catch (e: Exception) {
67-
// TODO: 예외 발생시, 카프카 장애 대응 확인
68-
// TODO: 카프카 쪽의 영향도 확인
69-
log.error("[FDS-Consumer] Analysis failed - eventId={}", avroMessage.eventId, e)
70-
// 재처리를 위해 예외 전파
71-
throw e
72-
}
63+
} catch (e: Exception) {
64+
log.error(
65+
"[FDS-Consumer] Analysis failed - partition={} offset={} eventId={} error={}",
66+
partition, offset, message.eventId, e.message, e
67+
)
68+
// 예외 발생시 재처리를 위해 전파
69+
throw e
7370
}
7471
}
72+
7573
}

services/fds/instances/api/src/main/resources/application.yml

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,25 @@ spring:
99
username: postgres
1010
password: pass1234
1111
driver-class-name: org.postgresql.Driver
12+
# HikariCP 최적화
13+
hikari:
14+
maximum-pool-size: 20
15+
minimum-idle: 10
16+
connection-timeout: 30000
17+
idle-timeout: 600000
18+
max-lifetime: 1800000
1219

1320
jpa:
1421
open-in-view: false
1522
hibernate:
1623
ddl-auto: none
17-
show-sql: true
24+
show-sql: false
25+
properties:
26+
hibernate:
27+
jdbc:
28+
batch_size: 20
29+
order_inserts: true
30+
order_updates: true
1831

1932
flyway:
2033
enabled: false
@@ -23,35 +36,35 @@ spring:
2336

2437
logging:
2538
level:
26-
org.hibernate.SQL: DEBUG
27-
org.hibernate.type.descriptor.sql.BasicBinder: TRACE
39+
org.hibernate.SQL: INFO
40+
org.hibernate.type.descriptor.sql.BasicBinder: INFO
2841
org.springframework.kafka: INFO
29-
io.github.hyungkishin.transentia: DEBUG
42+
io.github.hyungkishin.transentia: INFO
3043

3144
kafka-config:
3245
bootstrap-servers: host.docker.internal:9094
3346
schema-registry-url-key: schema.registry.url
3447
schema-registry-url: http://localhost:8085
35-
num-of-partitions: 8
48+
num-of-partitions: 3
3649
replication-factor: 1
37-
3850
kafka-consumer-config:
3951
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
4052
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
4153
consumer-group-id: fds-consumer-group
4254
auto-offset-reset: earliest
4355
specific-avro-reader-key: specific.avro.reader
4456
specific-avro-reader: true
45-
batch-listener: true
57+
58+
batch-listener: false
4659
auto-startup: true
47-
concurrency-level: 2 # 2000 TPS 는 4
60+
concurrency-level: 3
61+
max-poll-records: 100
62+
max-partition-fetch-bytes-default: 1048576
63+
max-partition-fetch-bytes-boost-factor: 1
64+
poll-timeout-ms: 1000
4865
session-timeout-ms: 10000
4966
heartbeat-interval-ms: 3000
5067
max-poll-interval-ms: 300000
51-
max-poll-records: 500
52-
max-partition-fetch-bytes-default: 1048576
53-
max-partition-fetch-bytes-boost-factor: 1
54-
poll-timeout-ms: 500
5568

5669
app:
5770
transfer:

0 commit comments

Comments
 (0)