티스토리 뷰

반응형

자 우리는 Stream 메서들을 알아 보았고 데이터를 생성해 가공하고 결과를 만들었다. 그렇다면 이러한 Stream을 왜쓰고 동작 흐름에 대해 좀 더 깊게 들어갈 필요가 있다. 

 

2부는 이 내용에 초점을 둘 것이다. 그 전에 사전 지식에 대해 알 필요가 있다.

 

동시성(Concurrency)

멀티 작업을 위해 스레드가 번갈아가며 실행하는 성질, 싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것이 아닌 동시성 작업이다.

 

병렬성(Parallelism)

멀티 작업을 위해 멀티 코어를 이용해 동시에 실행하는 성질

 

데이터 병렬성(Data Parallelism)

  • 전체 데이터를 나누어 서브 데이터를 만들고 서브 데이터를 병렬 처리해 작업을 빠르게 종료하는 것
  • q병렬 스트림은 데이터 병렬성을 구현
  • 멀티 코어의 수만큼 큰 요소로 나누고, 서브 요소로 분리된 스레드에서 병렬로 처리

작업 병렬성(Task Parallelism)

  • 서로 다른 작업을 병렬 처리하는 것
  • 웹 서버에서는 각각의 요청을 개별 스레드에서 병렬로 처리

Stream 동작 순서

Stream에 존재하는 메서드를 안다고 해도 내부 흐름에 대해 알지 못하면 처리속도가 지연되는 안쓴 것만 못한 상황이 발생할 수 있다. 그렇기에 우리는 내부 동작을 잘 이해하고 사용해야 한다.

 

예제를 보며 이해해 보자

public class StreamFlow {

    public static void main(String[] args) {
        Stream.of(1, 3, 5, 6, 8)
                .filter(s -> {
                    System.out.println("filter" + s);
                    return  true;
                })
                .forEach(s -> System.out.println("forEach" + s));
    }
}

만약 처음 본 사람들이라면 다음과 같은 실행을 예상했을 것이다.

public class StreamFlow01 {

    public static void main(String[] args) {
        Stream.of(1, 3, 5, 6, 8)
                .filter(i -> {
                    System.out.println("filter" + i);
                    return  true;
                })
                .forEach(i -> System.out.println("forEach" + i));
    }
}

하지만 예상과 다르게 실제로는 이런 식으로 실행이 된다.

이유는 간단하다 모든 데이터에 대해 filter가 실행되고 forEach가 실행되는 수평적 구조가 아닌 각각의 데이터에 대해 filter와 forEach가 먼저 수행하는 수직적 구조로 순회하기 때문이다. 

 

이렇게 순회하는 이유는 무엇일까? 

 

다음 동작을 확인해보자

public class StreamFlow02 {

    public static void main(String[] args) {
        boolean anyMatch = Stream.of("1", "3", "5", "6", "8")
                .map(s -> {
                    System.out.println("map : " + s);
                    return Integer.parseInt(s);
                })
                .anyMatch(s -> {
                    System.out.println("anyMatch : " + s);
                    return s == 3;
                });
    }
}

이 코드는 몇 번 실행될까? 수평적으로 동작한다면 map으로 각 문자열들을 Integer형으로 변환을 5번하고 anyMahtch가 3을 찾아야 하니 1하고 3 두 번 실행 되어 총 7번이 실행되어야 하지만 위 스트림은 수직적인 구조로 동작하기에 실제로 실행 시 map 2번 anyMatch2번 총 4번 실행 된다.

 

이러한 방식으로 실제로 연산되어야할 수를 무려 3번이나 줄였다!!

이러한 실행 순서를 공부하고 순서를 고려하여 Stream을 사용하는 것을 추천한다. 이러한 수직적인 구조는 직관적인 구조가 아니고 어떤 상황에서든 연산을 줄여주는 것은 아니기 때문이다. 예를 살펴보자

 

public class StreamPerform {

    public static void main(String[] args) {
        Stream.of("a", "b", "c", "d", "e")
                .map(s -> {
                    System.out.println("map : " + s);
                    return s.toUpperCase();
                })
                .filter(s -> {
                    System.out.println("filter : " + s);
                    return s.startsWith("A");
                })
                .forEach(s -> System.out.println("forEach : " + s));
    }
}

이 코드는 현재 map에서 대문자로 변환 후 대문자 A로 시작하는 것을 찾게 동작을 하는 Stream이다.  동작은 map을 거쳐 filter로 가는식으로 동작하기에 2개의 함수가 5번씩 동작을 수행하고 1번의 forEach연산 총 11번의 연산이 수행된다. 이를 우리는 더 효과적으로 바꿀 수 있다.

public class StreamPerform {

    public static void main(String[] args) {
        Stream.of("a", "b", "c", "d", "e")
                .filter(s -> {
                    System.out.println("filter : " + s);
                    return s.startsWith("a");
                })
                .map(s -> {
                    System.out.println("map : " + s);
                    return s.toUpperCase();
                })
                .forEach(s -> System.out.println("forEach : " + s));
    }
}

filter에서 "a"로만 시작되는 요소만 통과하여 내려줄 것이다. 그렇기에 filter 5번 map 과 forEach가 각 1번씩 수행되며 겨우 7번의 연산으로 같은 결과를 얻을 수 있다. 

 

보았는가 약간의 함수만 잘 변경해도 우리는 연산에 큰 변화를 줄 수 있다. 이것이 우리가 동작 순서를 이해해야 하는 이유이다.

 

자 이제 동작 순서와 그 중요성에 대해 알아보았다. 그렇다면 이제 아주 많은 양의 데이터를 처리해야 하는 경우에 런타임의 성능을 높이기 위해 사용되는 병렬 스트림에 대해서 좀 더 깊게 알아보자

 

병렬 스트림 성능에 대하여

Stream은 내부적으로 fork & join을 사용하며, ForkJoinPool. commonPool()을 통해 사용가능한 공통의 ForkJoinPool의 갯수를 확인할 수 있다. 내제되어 있는 ThreadPool의 갯수는 최대 5개이고 사용가능한 CPU에 수에 따라 다르다.

public class ForkJoin {

    public static void main(String[] args) {
        System.out.println(ForkJoinPool.commonPool());
    }
}

// java.util.concurrent.ForkJoinPool@880ec60[Running, parallelism = 3, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]

 

또한 이 값은 JVM 매개변수를 통해서 별도로 설정도 할 수 있다.

 

성능을 높이기 위해 사용되는 병렬 스트림이지만 항상 순차 스트림보다 빠른 것은 아니다. 병렬 스트림은 병렬화 하기 위해 스트림을 재귀적으로 분할하고, 스레드를 할당하고, 최종적으로 부분 결과를 하나로 합치는 과정이 필요하기 때문에 오히려 속도가 느릴 수 있다.

 

  • 요소의 수가 많고 요소 당 처리시간이 긴 경우
    • 병렬 처리는 스레드 풀을 생성하고 스레드를 생성하는 추가적인 비용이 발생한다. 때문에 요소의 수가 적다면 순차가 더 빠를 수 있다.
    • 멀티 코어에서 데이터의 이동은 오버헤드가 크기 때문에 데이터 전송 시간보다 오래 걸리는 작업만 병렬로 처리하는 것이 좋다.
  • 스트림 소스의 종류
    • ArrayList나 배열은 인덱스로 요소를 관리해 분리가 쉽지만 LinkedList는 분할을 위해서는 모두 탐색을 해야 하기 때문에 느릴 수 있다.
  • 코어의 수
    • 만약 실행하는 프로세서가 싱글 코어라면 스레드의 수만 증가하고 동시성 작업으로 진행되기 때문에 오히려 성능이 하락한다.
  • 병렬로 수행하는 어려운 스트림 모델
    • iterate()의 경우, 이전 연산의 결과가 스트림의 입력에 영향을 미친다. 때문에 이전 연산이 완료되어야 다음 스트림으로 넘어갈 수 있기 때문에 분할하기 어려워 성능이 오히려 하락한다.
  • 박싱의 최소화
    • 박싱과 언방싱은 성능을 크게 하락시키기 때문에 기본형 스트림을 우선 사용해야 한다.
  • 순서에 의존하는 연산
    • 순서에 의존하는 연산은 스트림에서 수행하게 되면 많은 비용이 발생한다.
    • 순서가 중요하지 않다면 findFirst보다 findAny가 좋고, 단순 limit보다 unordered().limit가 더 효율적이다.

포그/조인 프레임워크 

병렬 스트림 내부에서 포크/조인 프레임워크를 활용하고 있기 때문에 정확한 동작방식을 이해하면 병렬스트림을 쉽게 이해할 수 있다.

 

Recursive Task

스레드 풀은 사용하기 위해서는 RecursiveTask<R>의 서브 클래스를 구현하면 된다. 여기서 R은 병렬화를 통해 연산된 결과이다.

 

RecursiveTask를 구현하려면 compute() 추상메소드를 구현하면 된다. 이 메소드는 테스크를 서브 테스크로 분할하는 로직, 더이상 분할이 불가능할 때 서브 테스크의 결과를 생산할 알고리즘을 정의한다.

if(Task is small) { // 테스크가 작아 분할이 불가능
    Execute the task // 순차적으로 테스크 계산
} else {
    // 테스크를 두 서브 테스크로 분할
    ForkJoinTask first = getFirstHalfTask();
    first.fork();
    ForkJoinTask second = getSecondHalfTask();
    second.compute();
    // 모든 서브 테스크의 연산이 완료될 때까지 기다리고, 결과를 합친다.
    first.join();
}

// https://girawhale.tistory.com/131

각각의 서브 테스크의 크기가 작아질 때까지 재귀적으로 위 테스크를 분할한다 

더이상 분할이 불가능하면 서브 테스크를 병렬로 수행하고 나온 부분 결과를 조합 해 최종 결과를 만든다.

public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 1000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int len = end - start; // 서브 테스크의 배열 길이

        // 정해진 기준 값 이하로 배열의 길이가 줄어들 경우 결과를 반환
        if (len <= THRESHOLD) {
            return computeSequentially();
        }

        // 왼쪽 절반으로 분할
        ForkJoinSumCalculator left = new ForkJoinSumCalculator(numbers, start, start + len / 2);
        left.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행

        // 나머지 절반을 분할
        ForkJoinSumCalculator right = new ForkJoinSumCalculator(numbers, start + len / 2, end);
        Long rightRet = right.compute(); // 두 번째 서브 테스크를 동기로 실행한다.
        Long leftRet = left.join(); // 첫 번째 서브 테스크의 결과를 읽거나, 아직 결과가 나오지 않았다면 대기

        return leftRet + rightRet; // 두 서브 테스크의 결과를 합해 반환한
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static void main(String[] args) {
        long[] numbers = new long[1000];

        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = (long) (Math.random()*100);
        }


        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        Long ret = new ForkJoinPool().invoke(task); // 결과 반환

        System.out.println(ret);
    }
}
  • join()은 결과를 반환할 때까지 블록시키기 때문에 항상 두 서브 테스크를 시작한 뒤 호출해야 한다. 그렇지 않으면, 기존의 순차 스트림보다 느리게 된다.
  • RecursiveTask내에서는 ForkJoinPool의 invoke()를 사용하면 안되고 comput()나 fork()메소드를 호출해야 한다.
  • 분리된 서브테스크 중  한 작업에만 comput() 작업을 호출해야 한다. 한 테스크는 같은 스레드를 재사용할 수 있어 오버헤드가 감소한다.
  • 병렬 계산은 디버깅이 어렵다. 다른 스레드에서 compute()를 호출하기 때문에 stack trace가 도움이 되지 않는다.

 

오늘은 병렬 처리와 성능에 관한 얘기를 길게 한거 같다 다음에는 지연처리 Null-safe Stream 줄여 쓰는 Stream에 대해 얘기를 해볼 예정이다.

 

Reference.

https://mangkyu.tistory.com/115?category=872426 

 

[Java] Stream API의 활용 및 사용법 - 고급 (4/5)

1. FlatMap을 통한 중첩 구조 제거 [ FlatMap이란? ] 만약 우리가 처리해야 하는 데이터가 2중 배열 또는 2중 리스트로 되어 있고, 이를 1차원으로 처리해야 한다면 어떻게 해야 할까? 이러한 경우에 map

mangkyu.tistory.com

https://girawhale.tistory.com/131

 

[Java] 병렬 데이터 처리(병렬 스트림, 포크/조인 프레임워크)

병렬 처리(Parallel Operation)란 멀티 코어 환경에서 하나의 작업을 분할해 각각의 코어가 병렬적으로 처리하는 것이다. 자바7 이전에는 데이터 컬렉션을 병렬 처리하기 위해서는 데이터를 분할하고

girawhale.tistory.com

 

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함