[jboss-cvs] JBoss Messaging SVN: r8237 - branches/Branch_1_4/src/main/org/jboss/jms/client/container.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 14 23:45:18 EDT 2011
Author: gaohoward
Date: 2011-03-14 23:45:18 -0400 (Mon, 14 Mar 2011)
New Revision: 8237
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
Log:
JBMESSAGING-1850
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-03-03 10:26:05 UTC (rev 8236)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-03-15 03:45:18 UTC (rev 8237)
@@ -24,7 +24,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -289,6 +295,7 @@
private int consumeCount;
private boolean firstTime = true;
private volatile Thread onMessageThread;
+ private ExecutorService pool = Executors.newFixedThreadPool(1);
public int getBufferSize()
{
@@ -545,25 +552,80 @@
if (!isConnectionConsumer && !ignore)
{
- DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-
- sessionDelegate.preDeliver(info);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
- //If post deliver didn't succeed and acknowledgement mode is auto_ack
- //That means the ref wasn't acked since it couldn't be found.
- //In order to maintain at most once semantics we must therefore not return
- //the message
-
- ignore = !sessionDelegate.postDeliver();
-
+ if (timeout <= 0)
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ ignore = !sessionDelegate.postDeliver();
+
+ }
+ else
+ {
+ //JBMESSAGING-1850
+ Callable<Boolean> afterReceive = new Callable<Boolean>()
+ {
+ public Boolean call() throws Exception
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ return !sessionDelegate.postDeliver();
+ }
+ };
+
+ java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+ long tmUsed = System.currentTimeMillis() - startTimestamp;
+
+ if (tmUsed >= timeout)
+ {
+ log.warn("Timed out before post message processing, discarding message " + m);
+ return null;
+ }
+
+ try
+ {
+ ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrupted during getting future result.", e);
+ }
+ catch (ExecutionException e)
+ {
+ log.warn("received application exception.", e.getCause());
+ Throwable t = e.getCause();
+ if (t instanceof JMSException)
+ {
+ throw (JMSException)t;
+ }
+ }
+ catch (TimeoutException e)
+ {
+ log.warn("Timed out waiting for post message processing, discarding message " + m);
+ return null;
+ }
+ }
+
if (trace)
{
- log.trace("Post deliver returned " + !ignore);
+ log.trace("Post deliver returned " + !ignore);
}
-
+
if (!ignore)
{
- m.incDeliveryCount();
+ m.incDeliveryCount();
}
}
More information about the jboss-cvs-commits
mailing list