[jboss-cvs] JBoss Messaging SVN: r8237 - branches/Branch_1_4/src/main/org/jboss/jms/client/container.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 14 23:45:18 EDT 2011


Author: gaohoward
Date: 2011-03-14 23:45:18 -0400 (Mon, 14 Mar 2011)
New Revision: 8237

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
Log:
JBMESSAGING-1850



Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-03-03 10:26:05 UTC (rev 8236)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-03-15 03:45:18 UTC (rev 8237)
@@ -24,7 +24,13 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -289,6 +295,7 @@
    private int consumeCount;
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
+   private ExecutorService pool = Executors.newFixedThreadPool(1);
 
    public int getBufferSize()
    {
@@ -545,25 +552,80 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-                                                    
-                  sessionDelegate.preDeliver(info);                  
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
                   
-                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
-                  //That means the ref wasn't acked since it couldn't be found.
-                  //In order to maintain at most once semantics we must therefore not return
-                  //the message
-                  
-                  ignore = !sessionDelegate.postDeliver();  
-                  
+                  if (timeout <= 0)
+                  {
+                     sessionDelegate.preDeliver(info);
+
+                     // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                     // That means the ref wasn't acked since it couldn't be found.
+                     // In order to maintain at most once semantics we must therefore not return
+                     // the message
+
+                     ignore = !sessionDelegate.postDeliver();
+                     
+                  }
+                  else
+                  {
+                     //JBMESSAGING-1850
+                     Callable<Boolean> afterReceive = new Callable<Boolean>()
+                     {
+                        public Boolean call() throws Exception
+                        {
+                           sessionDelegate.preDeliver(info);
+
+                           // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                           // That means the ref wasn't acked since it couldn't be found.
+                           // In order to maintain at most once semantics we must therefore not return
+                           // the message
+
+                           return !sessionDelegate.postDeliver();
+                        }
+                     };
+
+                     java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+                     long tmUsed = System.currentTimeMillis() - startTimestamp;
+                     
+                     if (tmUsed >= timeout)
+                     {
+                        log.warn("Timed out before post message processing, discarding message " + m);
+                        return null;
+                     }
+
+                     try
+                     {
+                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                     }
+                     catch (InterruptedException e)
+                     {
+                        log.warn("Interrupted during getting future result.", e);
+                     }
+                     catch (ExecutionException e)
+                     {
+                        log.warn("received application exception.", e.getCause());
+                        Throwable t = e.getCause();
+                        if (t instanceof JMSException)
+                        {
+                           throw (JMSException)t;
+                        }
+                     }
+                     catch (TimeoutException e)
+                     {
+                        log.warn("Timed out waiting for post message processing, discarding message " + m);
+                        return null;
+                     }
+                  }
+
                   if (trace)
                   {
-                  	log.trace("Post deliver returned " + !ignore);
+                     log.trace("Post deliver returned " + !ignore);
                   }
-                  
+
                   if (!ignore)
                   {
-                     m.incDeliveryCount();                                
+                     m.incDeliveryCount();
                   }
                }
                                              



More information about the jboss-cvs-commits mailing list