요즘 회사에서 RabbitMQ를 사용하여
메시지 처리 프로그래밍을 하고 있습니다. 다른 개발자분과 함께 개발 중인데
저는 Consumer 역할을 하는 데몬을 구현중이네요.
생산자-소비자라는 것이 상대적인 부분이 있어서
MQ를 중심으로 제가 개발 중인 데몬은 소비자가 되지만...
제 데몬을 중심으로는 이 소비자가 다시 생산자가 되는 그런 개념이 됩니다.
아무튼 MQ로부터 메시지를 가져오고
그것을 내부 Queue에 저장하여 이런저런 할 일들을 구현을 해야하다보니
데몬안에서도 또 다시 생산자-소비자 패턴의 모델이 되어버렸습니다.
이를위해 Java에서 제공되는 BlockingQueue를 사용하였습니다.
"자바 병렬처리 프로그래밍"이란 책에서는 이 Queue가 생산자-소비자 패턴을 구현하는데
아주 좋은 Queue라고 소개가 되어있습니다.
그래서 기본적인 사용법을 작성해보려고 합니다.
Producer.java
위 소스가 생산자 역할을 하는 소스입니다. queue를 가지고 있고
이 queue에 900ms에 한번씩 msg를 생성하여 집어 넣습니다.
Consumer.java
Starter.java
실행소스입니다.
두 스레드가 사용 할 queue를 생성 후 스레드를 실행합니다.
각 스레드의 loop 간격을 조절해가면서 이런저런 테스트를 해보시면
좋을 것 같습니다.
몇가지 특징을 소개해드리면...
1. BlockingQueue를 생성 할 때 인자로 들어가는 int 파라메터는 fixed capacity를 뜻 합니다.
producer가 queue에 msg를 넣는 속도가 너무 빠르거나 해서 저 한계를 넘어가버리면 exception이 발생합니다.
다만, BlockingQueue의 다른 구현 클래스인 LinkedBlockingQueue()를 사용하시면 capacity와 상관없이
계속 queue를 채울 수 있지만.. 무한대로 queue가 쌓이는 것은 피하는 것이 좋겠죠..
####### 3월 2일 내용추가 #########
커멘트로 풍주형님께서 지적해주신 내용입니다.
api 문서에서는 아래와 같이..
add와 put의 동작 방식이 다르게 설명이 되어있습니다. 위 예제에서도 Linked... 를 사용하시는 대신
put 메서드를 사용하시게 되면 queue에 자리가 생길때까지 기다리게 된다고 하네요.
2. Consumer에서 사용하는 take() 메서드는 queue가 비어있으면 기다립니다.
그리고, queue가 채워지면 실행이 되는 구조 입니다. Socket의 accept를 생각하시면 되겠네요..
메시지 처리 프로그래밍을 하고 있습니다. 다른 개발자분과 함께 개발 중인데
저는 Consumer 역할을 하는 데몬을 구현중이네요.
생산자-소비자라는 것이 상대적인 부분이 있어서
MQ를 중심으로 제가 개발 중인 데몬은 소비자가 되지만...
제 데몬을 중심으로는 이 소비자가 다시 생산자가 되는 그런 개념이 됩니다.
아무튼 MQ로부터 메시지를 가져오고
그것을 내부 Queue에 저장하여 이런저런 할 일들을 구현을 해야하다보니
데몬안에서도 또 다시 생산자-소비자 패턴의 모델이 되어버렸습니다.
이를위해 Java에서 제공되는 BlockingQueue를 사용하였습니다.
"자바 병렬처리 프로그래밍"이란 책에서는 이 Queue가 생산자-소비자 패턴을 구현하는데
아주 좋은 Queue라고 소개가 되어있습니다.
그래서 기본적인 사용법을 작성해보려고 합니다.
Producer.java
위 소스가 생산자 역할을 하는 소스입니다. queue를 가지고 있고
이 queue에 900ms에 한번씩 msg를 생성하여 집어 넣습니다.
Consumer.java
Starter.java
실행소스입니다.
두 스레드가 사용 할 queue를 생성 후 스레드를 실행합니다.
각 스레드의 loop 간격을 조절해가면서 이런저런 테스트를 해보시면
좋을 것 같습니다.
몇가지 특징을 소개해드리면...
1. BlockingQueue를 생성 할 때 인자로 들어가는 int 파라메터는 fixed capacity를 뜻 합니다.
producer가 queue에 msg를 넣는 속도가 너무 빠르거나 해서 저 한계를 넘어가버리면 exception이 발생합니다.
다만, BlockingQueue의 다른 구현 클래스인 LinkedBlockingQueue()를 사용하시면 capacity와 상관없이
계속 queue를 채울 수 있지만.. 무한대로 queue가 쌓이는 것은 피하는 것이 좋겠죠..
####### 3월 2일 내용추가 #########
커멘트로 풍주형님께서 지적해주신 내용입니다.
api 문서에서는 아래와 같이..
BlockingQueue.add
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions,
returning true upon success and throwing an IllegalStateException if no space is currently available.
BlockingQueue.put
Inserts the specified element into this queue, waiting if necessary for space to become available.
add와 put의 동작 방식이 다르게 설명이 되어있습니다. 위 예제에서도 Linked... 를 사용하시는 대신
put 메서드를 사용하시게 되면 queue에 자리가 생길때까지 기다리게 된다고 하네요.
2. Consumer에서 사용하는 take() 메서드는 queue가 비어있으면 기다립니다.
그리고, queue가 채워지면 실행이 되는 구조 입니다. Socket의 accept를 생각하시면 되겠네요..