[jboss-cvs] JBoss Messaging SVN: r8309 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855: src/main/org/jboss/jms/client/container and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 18 18:47:53 EDT 2011


Author: jbertram
Date: 2011-05-18 18:47:53 -0400 (Wed, 18 May 2011)
New Revision: 8309

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java
Log:
JBPAPP-6531


Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:8151,8254-8255
   + /branches/Branch_1_4:8151,8237,8254-8255,8257

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-05-18 08:30:10 UTC (rev 8308)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1825_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1839_JBMESSAGING-1842_JBMESSAGING-1850_JBMESSAGING-1855/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-05-18 22:47:53 UTC (rev 8309)
@@ -24,7 +24,13 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -289,6 +295,7 @@
    private int consumeCount;
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
+   private ExecutorService pool = Executors.newCachedThreadPool();
    private boolean abortReceive;
 
    public int getBufferSize()
@@ -459,6 +466,8 @@
          
          this.listener = null;
       }
+      
+      pool.shutdownNow();
                            
       if (trace) { log.trace(this + " closed"); }
    }
@@ -546,25 +555,80 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-                                                    
-                  sessionDelegate.preDeliver(info);                  
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
                   
-                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
-                  //That means the ref wasn't acked since it couldn't be found.
-                  //In order to maintain at most once semantics we must therefore not return
-                  //the message
-                  
-                  ignore = !sessionDelegate.postDeliver();  
-                  
+                  if (timeout <= 0)
+                  {
+                     sessionDelegate.preDeliver(info);
+
+                     // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                     // That means the ref wasn't acked since it couldn't be found.
+                     // In order to maintain at most once semantics we must therefore not return
+                     // the message
+
+                     ignore = !sessionDelegate.postDeliver();
+                     
+                  }
+                  else
+                  {
+                     //JBMESSAGING-1850
+                     Callable<Boolean> afterReceive = new Callable<Boolean>()
+                     {
+                        public Boolean call() throws Exception
+                        {
+                           sessionDelegate.preDeliver(info);
+
+                           // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                           // That means the ref wasn't acked since it couldn't be found.
+                           // In order to maintain at most once semantics we must therefore not return
+                           // the message
+
+                           return !sessionDelegate.postDeliver();
+                        }
+                     };
+
+                     java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+                     long tmUsed = System.currentTimeMillis() - startTimestamp;
+                     
+                     if (tmUsed >= timeout)
+                     {
+                        log.warn("Timed out before post message processing, discarding message " + m);
+                        throw new JMSException("Timed out before post message processing, discarding message " + m);
+                     }
+
+                     try
+                     {
+                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                     }
+                     catch (InterruptedException e)
+                     {
+                        log.warn("Interrupted during getting future result.", e);
+                     }
+                     catch (ExecutionException e)
+                     {
+                        log.warn("received application exception.", e.getCause());
+                        Throwable t = e.getCause();
+                        if (t instanceof JMSException)
+                        {
+                           throw (JMSException)t;
+                        }
+                     }
+                     catch (TimeoutException e)
+                     {
+                        log.warn("Timed out waiting for post message processing, discarding message " + m);
+                        throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+                     }
+                  }
+
                   if (trace)
                   {
-                  	log.trace("Post deliver returned " + !ignore);
+                     log.trace("Post deliver returned " + !ignore);
                   }
-                  
+
                   if (!ignore)
                   {
-                     m.incDeliveryCount();                                
+                     m.incDeliveryCount();
                   }
                }
                                              



More information about the jboss-cvs-commits mailing list