본문 바로가기

Java/병렬 프로그래밍

자바 병렬 프로그래밍 - 구성 단위

4장에서 스레드 안전한 클래스를 만드는데 필요한 여러가지 기법을 봤다

특히 이미 스레드 세이프한 클래스에 안전성을 위임 하는 방법이 있었는데

효과적이고 쓸만한 방법이다.

 

자바 기본 라이브러리를 보면 병렬 프로그래밍을 작성할때 필요한 여러가지 동기화 도구가 있다.

ex. 스레드 세이프한 컬렉션 클래스들도 있고 동시에 동작하는 스레드 간의 작업을 조율 할 수 있도록 여러가지 동기화 기법도 있다.

 

이번장에서는 병렬 프로그래밍 과정에서 유용하게 사용할 수 있는 도구, 특히 자바 5,6에 포함된 클래스로 알아본다

그리고 몇가지 디자인 패턴도 볼 거다

 

동기화된 컬렉션 클래스

동기화 된 컬렉션 클래스의 대표로 Vector 와 Hashtable 이 있다.

이 클래스들은 모두 public 메소드를 클래스 내부에서 캡슐화해 내부의 값을 한 번에 하나의 스레드만 사용 할 수 있도록

제어 하면서 안전성을 확보 하고 있다.

 

- 동기화된 컬렉션 클래스의 문제점

 

 스레드 안전성을 확보는 하고 있지만 외부 프로그램 입장에서는 상황이 다르다.

스레드 A 와 B 가 동시에 시작해서 A 는 getLast 를 호출하고

B는 delete 를 호출하게 되면 getLast에서 Exception이 발생한다

Vector 입장에서는 스레드 안전성이 문제가 없는 것이지만, 단순히 없는 항목에 예외를 던진 것이니,

하지만 프로그램 입장에서는 값이 들어 있었음에도 값을 가져오니 터져 버린것이다.

따라서 올바르지 않은 방식.

 

동기화된 컬렉션 클래스는 대부분 클라이언트측 락을 사용할 수 있도록 만들어져 있어서

새로 추가하는 기능을 컬렉션 클래스에 다른 메소드와 같은 수준으로 동기화 시킬 수 있다.

클라이언트측 락을 이용해 동기화 시킨모습

- Iterator 와 ConcurrentModificationException

 

새로 추가된 컬렉션 클래스 역시 다중 연산 문제점을 해결하지 못했다.

Collection 클래스에 값을 반복시켜 읽어내는 표준적인 방법이 Iterator 다.

Iterator 를 사용해도 다른 스레드가 같은 시점에 값을 변경하는 작업을 못하게 만들어져 있다.

대신 즉시멈춤이 발생한다.

반복문이 수행되는 와중에 다른 스레드에서 값을 변경하려고 하면 즉시 멈춤이 포착해서 

즉시 ConcurrentModificationException 을 발생시키고 멈춰버린다.

 

이걸 방지하려면 , Vector 처럼 반복문 전체를 락으로 동기화 시키는 방법이 있긴 한데,

그다지 좋은 방법은 아닌게 엄청 많은 반복을 처리해야만 하면 작업 시간이 엄청 소모 될 수 있고

그 시간을 대기하고 있어야 한다.

또한 반복문 실행 중에 또 메소드 내부에서 락을 잡으면 데드락 가능성도 있어진다.

 

 비슷한 효과를 내려면, clone 메소드를 실행 하는동안만 동기화 시켜준 후

컬렉션 사본을 이용해서 반복문을 사용하는 방법이 있다.

 

병렬 컬렉션

동기화 컬렉션 클래스는 변수에 접근하는 통로를 일련화 시켜 스레드 안전성을 확보했다.

하지만 여러 스레드가 동기화된 컬렉션을 사용하려고 하면 사용성이 안좋아 진다.

 

병렬 컬렉션은 여러 스레드가 동시에 사용 할 수 있도록 설계 되어있다.

자바 5에는 HashMap 을 대치 시켜 병렬성을 확보한 ConcurrentHashMap 이 있다.

그리고 추가되어 있는 객체 목록을 반복시키며 열람하는 연산의 성능을 최우선 하는 

CopyOnWriteArrayList 가 있다.

 

* 동기화 컬렉션을 동기화 컬렉션 클래스를 병렬 컬렉션으로 교체하는 것 만으로도 성능을 향상 시킬수있다.

 

자바 5는 Queue 와 BlockingQueue 두 컬렉션 인터페이스가 추가 됐고

Queue 를 상속받는 BlockingQueue 는 큐에 항목을 추가하거나 뽑아낼 때 상황에 따라 대기 할 수 있도록 구현 되어 있다. 프로듀서 - 컨슈머 패턴을 구현할 때 굉장히 편리하게 사용 할 수 있다.

 

SortedMap 과 SortedSet 의 병렬성을 높인 ConcurrentSkipListMap 과 ConcurrentSkipListSet 클래스도 있다.

 

- ConcurrentHashMap

 

동기화 컬렉션은 항상 락을 확보 하고 있어야 한다.

근데 몇가지 연산들은 생각보다 많은 일을 해야 될 수 있다.

HashMap -> get 내부적으로 관리하는 해시 테이블을 뒤져봐야 하고

List -> contains 특정 객체가 들어있는지 확인 하기 위해 목록에 모든 객체를 순서대로 불러 equals 를 호출 해봐야한다.

(equals 는 상당한 양의 CPU 를 소모 할 수 있다.)

 

ConcurrentHashMap 은 HashMap 같이 해시 기반으로 하는데 내부적으로는 전혀 다른 동기화 기법을 사용해

병렬성과 확장성이 좋아 졌다.

이전에는 하나의 스레드만 컬렉션을 사용 할 수 있었지만, 

ConcurrentHashMap 은 락 스트라이핑(11장) 이라 불리는 세밀한 동기화 기법으로 

여러 스레드에서 공유하는 상태에 훨씬 잘 대응 할 수 있다. 훨씬 높은 성능을 보인다.

 

다른 병렬 클래스와도 같게 Iterator 를 만드는 부분이 발전했다.

Iterator 는 ConcurrentModificationException 을 발생시키지 않는다.

ConcurrentHashMap의 Iterator는 즉시 멈춤을 사용하지 않고, 대신 미약한 일관성 전략을 사용한다.

미약한 일관성 전략은 반복문과 동시에 컬렉션의 내용을 변경해도 Iterator 를 만들었던 상황대로 반복을 할 수 있다.

또한 만든 시점 이후 에 변경 내용으로 동작 할 수 도 있다.

 

단점으로는  isEmpty 나 size 가 값이 달라질 수 있고 특정 스레드가 독점해서 쓸 수 없다는건데 , 병렬성과 성능을 위해서면 주의하기는 해야하지만 감수할만 하다

 

단점이 있지만 훨씬 많은 장점이 있어서 ConcurrentHashMap을 사용 할 만하다

 

- Map 기반의 또 다른 단일 연산

 

ConcurrentHashMap 은 독점적으로 사용 할 수 있는 락이 없기 때문에,

새로운 단일 연산을 만들고자 할때 클라이언트 측 락을 사용 할 수 없다.

대신 '없을 경우에만 추가' '동일한 경우에만 제거' '동일한 경우ㅡ에만 대치' 

같은 연산들이 이미 구현 되어 있다.

 

꼭 구현해서 사용할거면 ConcurrentMap 을 사용하는게 좋다.

 

- CopyOnWriteArrayList

 

동기화된 List 클래스보다 병렬성을 훨씬 높이려고 만들어졌다.

Iterator 를 불러다가 쓸때 List 전체에 락을 걸 필요가 없다.

'변경 할 때 마다 복사' 를 바탕으로 스레드 안전성을 확보한다.

 

컬렉션의 내용이 변경 될 때 마다 복사본을 새로 만드는 전략을 취한다.

Iterator 가 실행되는 시점에 복사를 해서 사용하기 때문에

반복문일 실행되는동안 컬렉션에 추가되거나 삭제되는 내용이 있어도

동시성에 문제가 없다.

 

개별 항목마다 가시성을 확보하려는 목적으로 잠깐씩 락을 걸긴한다.

 

 

블로킹 큐와 프로듀서 -컨슈머 패턴

 

블로킹 큐는 put 과 take 라는 핵심 메소드를 갖고 있고

offer 와 poll 이라는 메소드도 갖고있다.

큐가 가득 차면 put 메소드는 값을 추가할 공간이 생길 때 까지 대기한다.

반대로 큐가 비어있으면 take 는 뽑아낼 값이 생길때까지 대기한다.

 

프로듀서 - 컨슈머 패턴은 "해야 할 일" 목록을 가운데에 두고 작업을 하는 주체와

작업을 처리하는 주체를 분리시키는 설계 방법이다.

 

프로듀서 - 컨슈머 패턴을 사용하면 작업을 만들어내는 부분과 작업을 처리하는 부분을 완전히 분리 할 수 있기 떄문에

개발 과정을 좀 더 명확하게 단순화 시킬 수 있다.

 

블로킹 큐를 사용하면 처리 할 수 있는 양보다 훨씬 많은 작업이 생겨 부하가 걸리는 상황에서도

작업량을 조절해 어플리케이션이 안정적으로 동작하도록 유도할 수 있다.

 

 

블로킹 메소드, 인터럽터블 메소드

 

스레드는 여러가지 원인에 의해 블록 당하거나 멈출 수 있다.

다른 스레드가 작업 중인 내용을 대기 할 수도 있고, I/O 작업을 기다릴수도 있다.

스레드가 블록되면 (BLOCKED, WAITING, TIMED_WAITING) 가운데 하나를 갖게 된다.

 

블로킹 연산은 일반 연산과는 달리 멈춘 상태에서 특정한 신호를 받아야 계속해서 실행 할 수 있는 연산을 말한다.

기다리던 외부 신호가 발견되면 스레드의 상태가 다시 Runnable 로 넘어가고 다시 시스템 스케줄러를 통해 CPU 를 사용 할 수 있다.

 

BlockingQueue 의 put 과 take 는 InteruptedException 이 발생할 수 있다.

InteruptedException이 발생 하는건 메소드가 블로킹 메소드라는 뜻이고

메소드에 인터럽트가 걸리면 대기 중인 상태에서 풀려나려고 노력한다.

 

인터럽트는 스레드가 서로 협력 하기 위한 방법이다

스레드A 가 스레드 B에 인터럽트를 건다는건 스레드 B에게 실행을 멈추라고 요청 하는 것이고 강제로 멈출 수 없다.

스레드B는 정상적인 종료 시점이 아니라 그전에 적절한 때를 찾아 실행 중인 작업을 멈춘다.

 

InteruptedException 이 발생할 때 대처 방법이 두가지 있다.

 

- InterruptedExcetion 전달 : InterruptedExcetion 을 호출한 메소드로 전달 하는 것.

 

- 인터럽트를 무시하고 복구 :  InterruptedExcetion 를 catch 한 후 상위 호출 메소드가 인터럽트 상황이 발생했음을 알 수 있도록 해야 한다.

 

InterruptedExcetion 가 발생 했는데 catch 로 잡고 아무런 대응을 하지 않으면 안된다.

 

동기화 클래스

블로킹 큐는 단순히 객체를 담는 컬렉션 클래스라는 것 뿐만 아니라, 프로듀서와 컨슈머 사이에서 

take 와 put 등의 블로킹 메소드를 사용해 작업 흐름을 조절 할 수 있다.

 

상태 정보를 사용해 스레드 간의 작업 흐름을 조절 할 수 있도록 만들어진 모든 클래스를

동기화 클래스 라고 한다.

 

블로킹 큐, 세마포어, 배리어, 래치 등이 있다.

다른 기능이 필요하다면 직접 구현하기도 한다(14장)

 

모든 동기화 클래스는 구조적인 특징이 있다.

모든 동기화 클래스에 접근하려는 스레드가 어떤 경우에는 통과 되고 어떤 경우에 대기 시켜야 하는지 결정하는 

상태 정보 가 있고,

상태를 변경하는 메소드를 제공하고, 동기화 클래스가 특정 상태에 진입 할 때까지 

효과적으로 대기 할 수 있는 메소드를 제공한다.

 

- 래치

 

 래치는 터미널 상태에 이를 때 까지 스레드가 동작하는 과정을 늦출 수 있도록 해주는 동기화 클래스다

일종의 관문 형태로 동작한다.

 

터미널 상태 전 까지는 문이 닫혀있다가 상태에 다다르면 모든 스레드가 통과한다.

래치는 한번 터미널 상태가 되면 이전 상태로 되돌릴수 없다.

 

예를들면 

- 특정 자원 "R" 을 확보 할때까지 대기 했다가 실행

 등이 있을 수 있다.

 

CountDownLatch 는 하나 또는 둘 이상의 스레드가 여러개의 이벤트가 일어날때까지 대기 할 수 있도록 되어있다.

 

래치의 상태는 내부의 카운트를 가지고 있고 

CountDown 메소드가 대기하던 이벤트가 발생 했을 때 카운트를 하나 낮춰준다.

즉 대기하던 이벤트가 모두 발생했을 떄 까지 대기 하는 메소드다

 

외부 스레드가 await 를 호출 할때 카운트가 0 보다 크면, 

카운터가 0이 될때까지, 또는 대기하던 스레드가 인터럽트 걸리거나 대기 시간이 길어져 타임아웃이 될때까지 대기한다.

 

 

 

시작 관문과 종료 관문을 만들었다.

시작은 카운트 1 로 초기화 하고 종료 는 전체 스레드 갯수로 값을 초기화 시켰다.

시작 관문이 열릴때까지 기다리고 있다가 

시작이 되면 그 이후 각 스레드가 작업 하게 된다. 

모든 작업이 끝나는 시점이 오면 총 얼마나 걸렸는지 시간을 호출 한다.

 

여기서 래치를 안썻으면 

스레드가 동시에 시작했을 때 작업시간이 얼마나 걸렸는지를 알 수 없다.

 

- FutureTask

 

FutureTask 는 래치와 비슷하게 동작하는데

시작 전 대기, 시작됨, 종료됨 같은 세 가지 상태를 가지고 있다.

 

-세마포어

 

카운팅 세마포어는 특정 자원이나 특정 연산을 동시에 사용하거나 

호출 할 수 있는 스레드의 수를 제한 할 때 사용 된다.

 

세마포어는 내부적으로 퍼밋 (permit) 을 만들어 내부 상태를 관리한다.

최초로 생성할 때 퍼밋의 수를 넘겨주고

외부 스레드는 퍼밋을 요청해 확보 하거나 확보한 퍼밋을 반납하게 된다.

 

acquire 메소드는 남는 퍼밋이 없으면 퍼밋이 생기거나 , 인터럽트가 걸리거나, 타임아웃이 생길때까지

대기 시킨다.

release 메소드는 확보했던 퍼밋을 다시 세마포어에게 반납하는 기능을 한다.

 

이진 세마포어를 예로 봐보자

이진 세마포어는 최초 퍼밋 값이 1 인 카운팅 세마포어다

이진 세마포어는 비재진입 락의 역할을 하는 뮤텍스 를 만들 때 활용된다.

 

세마포어는 데이터베이스 커넥션 풀을 사용할 떄 요긴 하게 쓸 수 있다.

최초 퍼밋 갯수로 컨넥션 갯수를 정하고

풀에서 자원을 할당 받으려고 할 때는 acquire 를 호출해 커넥션이 없으면 대기 하도록 하고

다 사용한 후에는 release 로 자원을 반납 하게 한다.

 

- 배리어

 

배리어는 특정 이벤트가 발생 할 때까지 여러개의 스레드를 대기 상태로 잡아 둔다는 점은 래치와 비슷하지만,

배리어는 모든 스레드가 배리어에 도착을 해야 관문이 열린다는것

 

래치는 이벤트를 기다리는 동기화 클래스인 반면 배리어는

"다른 스레드"를 기다린다는 점이 다르다

 

CyclicBarrier 클래스를 사용하면 특정 배리어 포인트에서 반복적으로 서로 만나는 기능을 만들 수 있고

커다란 문제 하나를 여러개의 작은 부분으로 분리해 병렬 처리하는 알고리즘을 구현 할 떄 사용하기 좋다.

 

스레드는 각자 배리어 포인트에 도착하면 await 를 호출 하고

await 메소드는 모든 스레드가 도착할때까지 기다린다.

모든 스레드가 도착하면 통과 시키고 배리어는 다시 초기 상태로 돌아가 다음 배리어 포인트를 준비한다.

(래치는 일회성인 것과 대비된다.)

 

 

효율적이고 확장성 있는 결과 캐시 구현

캐시를 사용하면 메모리를 좀 더 사용하긴 하지만 대기 시간을 크게 줄여 처리 용량을 늘릴 수 있다.

하지만 캐시를 대충 만들면 성능의 병목 현상이 확장성의 병목으로 바뀌는 결과를 얻을 수 있다.

효율적이면서 확장 할 수 있는 캐시를 구현해보자

 

 

HashMap과 동기화 기능을 사용해 구현한 첫번째 캐시
병렬성이 좋지 않다.

단순한 HashMap을 이용해서 구현해보자

Computable 인터페이스를 구현한 ExpensiveFunction 클래스는

결과를 뽑는데 상당히 시간이 걸린다.

여기서 Computable에 한겹을 덧씌워 이전 결과를 기억하는 캐시 기능을 추가해보자

이 방법을 메모이제이션 이라고 한다.

 

결과를 저장하는 저장소로 HashMap을 사용했다.

compute 메소드가 이미 값이 있는지 확인 후 없으면 새로 계산을 한다.

HashMap은 스레드 안전하지 않기 때문에 메소드 전체를 동기화 시키는 단순한 정책을 했다.

이러면 스레드 안전하긴 하지만 성능이 좋지 않다.

 

ConcurrentHashMap 으로 교체

위에 비해 매우 병렬프로그래밍 입장에서 개선됐다.

ConcurrentHashMap 은 이미 스레드 안전하기 때문에 

별도 동기화를 안해도된다. 

하지만 캐시의 기능으로 아직도 미흡하다.

다른 스레드가 현재 어떤 연산을 하고 있는지 알 수 없기 때문에

동일한 연산을 시작할 수 있다.

 

이 일을 잘 처리 하기 위해선 FutureTask 클래스를 쓰면 된다.

FutureTask 는 이미 끝났거나 끝날 예정인 연산 작업을 표현한다.

FutureTask 는 연산이 끝난 즉시 연산 결과를 리턴해준다.

 

FutureTask를 사용한 캐시

 

하지만 아직도 미흡한 점이 있는데 동시에 들어온 스레드에서는 연산을 두번 할 수 있다는 점이다.

타이밍이 좋지 않아서 값을 두번 계산

 

이 문제는 Map 이 단일 연산이 아니라 복합 연산이기 때문인데,

이걸 단일 연산으로 구성 하기 위해 

ConcurrentMap 의 putIfAbsent 라는 단일 연산에 저장을 한다.

최종 캐시