Class ConcurrentMessageListener
- java.lang.Object
-
- org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener
-
- All Implemented Interfaces:
org.springframework.jms.listener.SessionAwareMessageListener
public class ConcurrentMessageListener extends java.lang.Object implements org.springframework.jms.listener.SessionAwareMessageListener
Message listener injected at runtime into Aggregate to handle a race condition when multiple threads simultaneously process messages from a Cas Multiplier. It is only used to process messages from a Cas Multiplier and only if the reply queue has more than one consumer thread configured in a deployment descriptor. The listener creates a pool of threads equal to the number of concurrent consumers defined in the DD for the listener on the reply queue. Once the message is handled in onMessage(), it is than delegated for processing to one of the available threads from the pool. This listener guarantees processing order. It receives messages from Spring in a single thread and if it finds a child CAS in the message, it increments the parent (input) CAS child count and delegates processing to the InputChannel instance. The race condition: The Cas Multiplier sends the last child and the parent almost at the same time. Both are received by the aggregate and are processed in different threads, if a scaleout is used on the reply queue. One thread may start processing the input CAS while the other thread (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way to the final step and since at this time, the input CAS has no children ( the thread processing the last child has not updated the child count yet), it will be prematurely released. When the thread with the last child is allowed to run, it finds that the parent no longer exists in the cache.
-
-
Constructor Summary
Constructors Constructor Description ConcurrentMessageListener(int concurrentThreads, java.lang.Object delegateListener, java.lang.String destination, java.lang.ThreadGroup threadGroup, java.lang.String threadPrefix)
Creates a listener with a given number of process threads.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description java.util.concurrent.ThreadPoolExecutor
getTaskExecutor()
void
onMessage(javax.jms.Message message, javax.jms.Session session)
Intercept a message to increment a child count of the input CAS.void
setAnalysisEngineController(AnalysisEngineController controller)
void
stop()
-
-
-
Constructor Detail
-
ConcurrentMessageListener
public ConcurrentMessageListener(int concurrentThreads, java.lang.Object delegateListener, java.lang.String destination, java.lang.ThreadGroup threadGroup, java.lang.String threadPrefix) throws java.io.InvalidClassException
Creates a listener with a given number of process threads. This listener is injected between Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on reply queues that have scale out attribute in DD greater than 1. Its main job is to increment number of child CASes for a given input CAS. It does so in a single thread, and once it completes the update this listener submits the CAS for further processing up to the JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to process the CAS.- Parameters:
concurrentThreads
- - number of threads to use to process CASesdelegateListener
- - JmsInputChannel instance to delegate CAS to- Throws:
java.io.InvalidClassException
-
-
Method Detail
-
getTaskExecutor
public java.util.concurrent.ThreadPoolExecutor getTaskExecutor()
-
stop
public void stop()
-
setAnalysisEngineController
public void setAnalysisEngineController(AnalysisEngineController controller)
-
onMessage
public void onMessage(javax.jms.Message message, javax.jms.Session session) throws javax.jms.JMSException
Intercept a message to increment a child count of the input CAS. This method is always called in a single thread, guaranteeing order of processing. The child CAS will always come here first. Once the count is updated, or this CAS is not an child, the message will be delegated to one of the threads in the pool that will eventually call InputChannel object where the actual processing of the message begins.- Specified by:
onMessage
in interfaceorg.springframework.jms.listener.SessionAwareMessageListener
- Throws:
javax.jms.JMSException
-
-