Java

Thread Pool wait Queue가 꽉찬다면 어떻게 해결해야할까?

DEV_GOLF 2023. 5. 4. 22:33
반응형

배경.

배치를 만들던 도중에 Bean으로 등록해놓은 Thread Pool에 Thread가 꽉차서 Queue에 더 이상 Job이 들어갈 수 없다는 예외가 발생했습니다. 예를 들기 위해 테스트 코드를 하나 작성해보겠습니다. 

class ThreadPoolExecutorTest {

    @Test
    fun testThreadPoolQueueCapacity() {
        val executor = ThreadPoolExecutor(
            2, // core pool size
            2, // max pool size
            0L, // keep alive time
            TimeUnit.MILLISECONDS, // time unit for keep alive time
            LinkedBlockingQueue(1) // queue with capacity 1
        )

        Assertions.assertThrows(RejectedExecutionException::class.java) {
            for (i in 1..4) {
                executor.submit {
                    try {
                        println("Task $i started")
                        Thread.sleep(5000)
                        println("Task $i completed")
                    } catch (e: InterruptedException) {
                        e.printStackTrace()
                    }
                }
            }
        }

        executor.shutdown()
    }
}

RejectedExecutionException이 발생하는 예시입니다. 

그렇다면 이 문제를 어떻게 해결해야할까요 ? 해결하기 전에 Thread Pool에서 작업들을 어떻게 관리하는지 알아봅시다. 

 

Thread Pool과 Wait Queue

Thread Pool에 모든 스레드가 사용 중인데 요청이 계속들어올 수 있습니다. 그리고 이거를 wait queue에 담아 대기 시키는데요. 이 때 Java에서는 Blocking Queue에 담아 처리하고 있습니다. 대표적으로 Executors.newFixedThreadPool을 살펴보면 

내부적으로 LinkedBlockingQueue 를 사용하여 모든 스레드들이 사용중일 때 대기할 수 있는 Queue를 만듭니다. 

 

이걸 그림으로 표현하면 이렇습니다.

Main thread가 작업을 생성하여 Queue에 전달하고 Queue에 담겨있는 Job 들은 현재 사용하지 않는 Thread가 존재하면 Pool에서 Thread에게 할당 되어 처리됩니다.

 

위 테스트에서 Exception이 발생한 이유는 Main Thread가 계속해서 작업을 생산하여 Thread 들에게 전달해주어야 하는데 Blocking Queue 공간과 Thread 자원을 모두 사용 중이라 더 이상 작업을 받지 못해 발생한 에러 입니다.

 

🤔 해결하기 위한 방법 2가지

본 글에선 문제를 해결할 때 사용한 두 가지 방법을 소개 드릴 예정입니다.

  1. Blocking Queue Size 늘리기
    가장 간단한 해결 방법입니다. Queue의 capacity size를 늘려 더 많은 Job 들이 대기할 수 있게 해줍니다. 하지만 이 방법은 작업 지연 시간이 늘어나며 메모리 사용량이 크게 늘어날 수 있습니다. 서버 상황을 고려하여 늘려주어야 합니다. 

  2. CallerRunsPolicy 전략을 이용하기 
    rejected execution handler로 설정을 통해 처리하는 방법입니다. CallerRunsPolicy 전략은 스레드 풀 작업 처리 능력을 초과하는 작업이 제출될 때, 작업을 생산한 스레드가 해당 작업을 처리하게 합니다. 이 경우 스레드 풀의 작업 능력을 초과하더라도 작업이 계속 처리됩니다. 
    결론적으로 메모리 사용량을 크게 사용하지 않으면서 처리가 가능합니다. 다만 이 경우에 작업을 생산한 스레드가 직접 처리하고 다시 생산을 해야하기 때문에 작업 속도가 느려질 수 있습니다. 

코드를 보면 다음과 같습니다. 

class ThreadPoolExecutorTest {

    @Test
    fun testThreadPoolQueueCapacity() {

        val executor = ThreadPoolExecutor(
            2, // core pool size
            2, // max pool size
            0L, // keep alive time
            TimeUnit.MILLISECONDS, // time unit for keep alive time
            LinkedBlockingQueue(1) // queue with capacity 1
        )

		// CallerRunsPolicy 전략 사용
        executor.rejectedExecutionHandler = ThreadPoolExecutor.CallerRunsPolicy()

        for (i in 1..4) {
            executor.submit {
                try {
                    println("Task $i started")
                    Thread.sleep(5000)
                    println("Task $i completed")
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }
            }
        }

        executor.shutdown()
    }
}

정상적으로 처리가 된 걸 볼 수 있습니다. 

 

반대로 그럼 Blocking Queue Size를 키워보겠습니다. 

class ThreadPoolExecutorTest {

    @Test
    fun testThreadPoolQueueCapacity() {

        val executor = ThreadPoolExecutor(
            2, // core pool size
            2, // max pool size
            0L, // keep alive time
            TimeUnit.MILLISECONDS, // time unit for keep alive time
            LinkedBlockingQueue(10) // queue with capacity 10 Increase
        )

//        executor.rejectedExecutionHandler = ThreadPoolExecutor.CallerRunsPolicy()

        for (i in 1..4) {
            executor.submit {
                try {
                    println("Task $i started")
                    Thread.sleep(5000)
                    println("Task $i completed")
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }
            }
        }

        executor.shutdown()
    }
}

 

결론

저는 위 두 가지 방법 모두를 사용해서 해결했습니다. 큐 사이즈도 늘리고 만에하나 제가 예상한 큐 사이즈 보다 많은 요청량이 들어오더라도 예외가 발생하지 않고 계속 처리될 수 있도록 말입니다. 그리고 모니터링을 통해서 혹시나 해당 API가 느려지면 바로 alert을 확인할 수 있게 slack에 경보를 연동시켜놓았기 때문에 바로 대응이 가능한 상황이었습니다.

 

위 두 방법에 대해 설명 드렸지만 외에도 여러 해결 방법이 있을 수 있습니다. 만약 이메일 푸시 알람 같은 비즈니스 로직에 벗어나는 동작들은 비동기나 ExceptionHandling 으로 해결할 수 있죠.

또한 더 엄청난 양의 요청을 처리해야한다면 kafka, rabbitMQ 같은 Message Queue 등을 고려할 수 있겠습니다. 

 

마침.