[jboss-cvs] JBoss Messaging SVN: r8309 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855: src/main/org/jboss/jms/client/container and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 18 18:47:53 EDT 2011
Author: jbertram
Date: 2011-05-18 18:47:53 -0400 (Wed, 18 May 2011)
New Revision: 8309
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java
Log:
JBPAPP-6531
Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:8151,8254-8255
+ /branches/Branch_1_4:8151,8237,8254-8255,8257
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-05-18 08:30:10 UTC (rev 8308)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-05-18 22:47:53 UTC (rev 8309)
@@ -24,7 +24,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -289,6 +295,7 @@
private int consumeCount;
private boolean firstTime = true;
private volatile Thread onMessageThread;
+ private ExecutorService pool = Executors.newCachedThreadPool();
private boolean abortReceive;
public int getBufferSize()
@@ -459,6 +466,8 @@
this.listener = null;
}
+
+ pool.shutdownNow();
if (trace) { log.trace(this + " closed"); }
}
@@ -546,25 +555,80 @@
if (!isConnectionConsumer && !ignore)
{
- DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-
- sessionDelegate.preDeliver(info);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
- //If post deliver didn't succeed and acknowledgement mode is auto_ack
- //That means the ref wasn't acked since it couldn't be found.
- //In order to maintain at most once semantics we must therefore not return
- //the message
-
- ignore = !sessionDelegate.postDeliver();
-
+ if (timeout <= 0)
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ ignore = !sessionDelegate.postDeliver();
+
+ }
+ else
+ {
+ //JBMESSAGING-1850
+ Callable<Boolean> afterReceive = new Callable<Boolean>()
+ {
+ public Boolean call() throws Exception
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ return !sessionDelegate.postDeliver();
+ }
+ };
+
+ java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+ long tmUsed = System.currentTimeMillis() - startTimestamp;
+
+ if (tmUsed >= timeout)
+ {
+ log.warn("Timed out before post message processing, discarding message " + m);
+ throw new JMSException("Timed out before post message processing, discarding message " + m);
+ }
+
+ try
+ {
+ ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrupted during getting future result.", e);
+ }
+ catch (ExecutionException e)
+ {
+ log.warn("received application exception.", e.getCause());
+ Throwable t = e.getCause();
+ if (t instanceof JMSException)
+ {
+ throw (JMSException)t;
+ }
+ }
+ catch (TimeoutException e)
+ {
+ log.warn("Timed out waiting for post message processing, discarding message " + m);
+ throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+ }
+ }
+
if (trace)
{
- log.trace("Post deliver returned " + !ignore);
+ log.trace("Post deliver returned " + !ignore);
}
-
+
if (!ignore)
{
- m.incDeliveryCount();
+ m.incDeliveryCount();
}
}
More information about the jboss-cvs-commits
mailing list