티스토리 뷰
https://golf-dev.tistory.com/75
무엇이 문제인가?
Redis는 10만 TPS의 처리량을 보여주기 때문에 DB write 작업에 한번에 많은 트래픽이 몰릴 수 있습니다. 그리고 이러한 요청이 밀려 들어오면 장애로 이어지기 때문에 꽤나 중요한 문제일 수 있습니다. 또한 JPA를 사용하는 경우 자체적으로 Blocking Call을 하기 때문에 병목이 생길 수 있습니다.
그렇다면 어떻게 해결할 수 있을까요 ?
먼저 부하를 덜기 위해 필요한 상황은 DB에 저장하는 요청을 조절하여야 한다는 것입니다. 일정 시간은 지연시켜 DB 서버가 안정화 되기 까지 기다려야하는 것이죠.
kafka는 이러한 요구사항에 적절한 솔루션을 제공해주었습니다.
Kafka 도입하기
kafka는 데이터 스트리밍, 파이프라인 등을 위해 설계된 분산 이벤트 스트리밍 오픈 소스입니다. linkedIn에서 개발하여 현재 널리 쓰이고 있습니다.
Pub-sub 모델의 메시지 큐 형태로 분산환경에 특화되어 있으며 Source Application과 Target Application 사이의 디커플링을 해주는 역할도 합니다.
Message Queue란 Producer 와 Consumer 간에 데이터를 전달하기 위한 중간 Broker를 두어 프로세스간 데이터 전달하는 메시지 지향 미들웨어를 구현한 시스템으로 별도의 통신 없이 데이터 전달이 가능합니다.
Message Queue의 장점은 비동기 성이기 때문에 Consumer에서의 처리는 독립적으로 동작해 Latency가 오래걸리는 작업을 사용자가 기다리지 않아도 되게 디커플링 해주는 점이 있습니다. 또한 디커플링이 되며 애플리케이션과 결합도가 낮아지고 Producer나 consumer를 확장하더라도 서로 언제든지 데이터를 주고받을 수 있어 확장성 또한 갖고 있습니다.
또한 kafka는 Redis나 RabbitMQ와는 다르게 Consumer가 polling하여 데이터를 주고 받기 때문에 Consumer 상태에 따라 처리가 가능하다는 장점이 있고 디스크에 메시지를 일정 주기 동안 보관하여 데이터 유실이 적습니다.
(반면에 Redis는 휘발성 RabbitMQ는 설정에 따라 다름, 또한 RabbitMQ는 polling을 지원하지만 매커니즘이 push에 적합하여 kafka에 비해 polling 방식에서 좋은 성능을 발휘 못함)
자세한 설명은 추후 블로깅으로 찾아뵙는 것으로 하고 문제를 해결해보겠습니다.
Kafka 설정하기
카프카를 먼저 설정해줍시다. 카프카는 docker를 이용해 setting 했으며 compose 정보는 다음과 같습니다.
zookeeper:
platform: linux/x86_64 # mac용 세팅
image: wurstmeister/zookeeper
container_name: coupon_zookeeper
ports:
- "2181:2181"
kafka:
platform: linux/x86_64 # mac용 세팅
image: wurstmeister/kafka:2.12-2.5.0
container_name: coupon_kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
카프카는 내부적으로 topic이라는 이벤트 저장소를 두고 있습니다.
topic을 세팅 해주어야 Producer와 Consumer 사이에 통신을 할 수 있고 발생한 이벤트들을 저장할 수 있습니다.
@Configuration
class KafkaProducerConfig(
@Value("\${kafka.host}") private val host: String,
@Value("\${kafka.port}") private val port: Int
) {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val config = HashMap<String, Any>()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "${host}:${port}"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(config)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
host와 port를 설정하고 객체를 직렬화하여 보내야하기 때문에 Serializer를 설정하여 데이터 전송 중 손상되지 않게 해야합니다. 이제 그럼 컨슈머 쪽에서 데이터를 받아야 하니 Consumer 쪽 kafka 설정도 해줍시다.
Consumer Group 명은 group_1로 지정하여 Consumer Group을 생성해주고 producer와 동일하게 host와 port 그리고 직렬화 전략을 설정해줍니다.
@Configuration
class KafkaConsumerConfig(
@Value("\${kafka.host}") val host: String,
@Value("\${kafka.port}") val port: Int,
@Value("\${kafka.group-id}") val groupId: String
) {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val config = HashMap<String, Any>()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "${host}:${port}"
config[ConsumerConfig.GROUP_ID_CONFIG] = groupId
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(config)
}
@Bean
fun kafkaListenerContainFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
추후에 kafka에 대한 블로깅을 하면서 consumer group에 대해 설명 드리겠습니다. 현재는 간단하게 하나의 topic에 대한 consumer 서버를 그룹화 하는 것이라고 알고 있으면 됩니다.
자 그러면 각각 이벤트를 발행하고 소비하는 producer와 consumer들을 개발해보겠습니다.
Producer 구성
@Transactional
fun applyVer3(userId: Long) {
val count: Long = couponCountRepository.increment(userId)
?: throw IllegalArgumentException("쿠폰 발급 수량 정보를 가져오는데 실패했습니다.")
couponCountCheck(count)
couponCreateProducer.create(userId)
}
Redis에서 갯수를 count하며 매번 갯수를 확인하고 통과한다면 다음 프로세스를 탈 수 있게 설계했습니다.
@Component
class CouponCreateProducer(
private val kafkaTemplate: KafkaTemplate<String, String>
) {
fun create(userId: Long) {
kafkaTemplate.send("coupon_create", userId.toString())
}
}
producer 로직은 간단합니다. coupon_create이름의 topic에 userId를 전송합니다. 이 때 StringSerializer 전략으로 직렬화 하기 때문에 toString() 으로 문자열을 전송합니다.
Consumer 구성
@Component
class CouponCreatedConsumer(
private val couponRepository: CouponRepository,
private val failEventHistoryService: FailEventHistoryService
) {
@KafkaListener(topics = ["coupon_create"], groupId = "group_1")
fun couponCreateListener(userId: Long) {
val result = kotlin.runCatching { couponRepository.save(Coupon(userId)) }
val isFailure = result.exceptionOrNull()
if (isFailure != null) {
val errorMessage = isFailure.message?: "ProvideCouponFail"
val failEventHistory = FailEventHistory(userId, errorMessage, EventStatus.FAIL)
failEventHistoryService.save(failEventHistory)
}
}
}
Consumer는 coupon_create라는 topic을 바라보고 group_1에 속해있습니다. 그리고 userId를 topic으로부터 받아와 데이터를 저장해주고 있습니다.
또한 비동기로 쿠폰 발행과 발행 신청 두 애플리케이션이 디커플링 되었기 때문에 사용자가 특정 오류로 인해 쿠폰 발행엔 성공했지만 실제로 발행을 못받을 수가 있습니다. 그렇기 때문에 FailEvent를 저장하고 장애에 대응이 가능하게 errormessage를 받아 FailEvent에 같이 저장해줍니다. 또한 재발행 할 수 있게 필요한 데이터를 저장해줍니다.
그리고 스케줄러를 통해 재발행 해주어야 하기 때문에 scheduler를 추가해주겠습니다.
@Scheduled(cron = "0 */5 * * * ?")
fun publishCouponFailOverScheduler() {
failEventHistoryService.retry()
}
@Transactional
fun retry() {
val historyByStatus = failEventHistoryRepository.findByEventStatus(EventStatus.FAIL)
for (status in historyByStatus) {
status.eventStatus = EventStatus.PROCESSING
val userId = status.userId
couponService.failCouponRetry(userId)
status.eventStatus = EventStatus.SUCCESS
}
}
FailEvent를 Fail 상태인 데이터만 가져와서 5분마다 다시 저장해주고 Success로 바꿔주고 있습니다. 또한 중간에 예기치못한 오류는 Transactional을 통해 rollback하여 Fail이 유지되어 언제든 재시작 할 수 있게 설계했습니다.
전체 플로우를 그림으로 보면 다음과 같습니다.
테스트
테스트 코드는 다음과 같습니다.
@Test
@DisplayName("여러 사용자가 쿠폰을 구매한다.")
fun applyManyPeople() {
val threadCount = 1000
val executorService = Executors.newFixedThreadPool(32)
val latch = CountDownLatch(threadCount)
for (i in 1 .. threadCount) {
executorService.execute {
try {
couponService.applyVer3(i.toLong())
} finally {
latch.countDown()
}
}
}
latch.await()
Thread.sleep(10000)
val count: Long = couponRepository.count()
assertThat(count).isEqualTo(100)
}
Thread sleep은 kafka에서 데이터를 지연 시켜 저장시키기 때문에 일부로 10초를 테스트 지연시켜 데이터를 확인할 수 있게 했습니다.
그렇다면 결과를 봅시다.
결과
성공한걸 확인할 수 있습니다.
회고
DB 부하를 줄이기 위해 kafka를 두고 천천히 데이터를 넣어주게 했습니다. 실제로 테스트 결과 바로 DB에 저장되는 것이 아닌 조금의 delay가 되는걸 확인했습니다. 이렇게 delay로 천천히 넣기 때문에 Consumer 입장에선 데이터를 천천히 넣게 되어 부하를 줄일 수 있습니다. 설령 그럼에도 부하가 걸려 DB service가 죽더라도 kafka에 commit 하기 전까지 offset을 업데이트 시키지 않아 consumer가 poll 하기 전까지 이벤트를 저장해놓을 수 있어 비교적 안전하게 처리가 가능해집니다.
또한 Consumer에서의 처리가 분리되어 문제가 될 수 있기 때문에 이 또한 고민하여 Fail-Over를 처리해주었습니다.
마침.
'kotlin' 카테고리의 다른 글
선착순 쿠폰 서비스에서 데이터 정합성 개선하기 (0) | 2023.06.15 |
---|---|
코루틴을 사용하면 스프링에서 비동기를 편하게 쓸 수 있을까? (0) | 2023.03.13 |
- Total
- Today
- Yesterday
- 코드
- Spring
- 프로젝트
- docker
- 면접
- MySQL
- 코딩
- 개발자
- DB
- JPA
- swarm
- 동시성
- 프로그래밍
- 자바
- 백엔드
- DevOps
- 면접준비
- 취업
- IT
- 인터뷰
- 게시판
- java
- 개발
- CS
- Redis
- thread
- Kotlin
- 면접 준비
- 취업준비
- 취준
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |