[jboss-cvs] JBoss Messaging SVN: r8300 - branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 13 10:19:53 EDT 2011


Author: dgrove_redhat.com
Date: 2011-05-13 10:19:52 -0400 (Fri, 13 May 2011)
New Revision: 8300

Modified:
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
Log:
Merged JBMESSAGING-1850 into SOA-3026 branch.

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-05-13 14:01:27 UTC (rev 8299)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-05-13 14:19:52 UTC (rev 8300)
@@ -24,7 +24,13 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -289,6 +295,7 @@
    private int consumeCount;
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
+   private ExecutorService pool = Executors.newCachedThreadPool();
 
    public int getBufferSize()
    {
@@ -458,6 +465,8 @@
          
          this.listener = null;
       }
+      
+      pool.shutdownNow();
                            
       if (trace) { log.trace(this + " closed"); }
    }
@@ -545,25 +554,80 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-                                                    
-                  sessionDelegate.preDeliver(info);                  
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
                   
-                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
-                  //That means the ref wasn't acked since it couldn't be found.
-                  //In order to maintain at most once semantics we must therefore not return
-                  //the message
-                  
-                  ignore = !sessionDelegate.postDeliver();  
-                  
+                  if (timeout <= 0)
+                  {
+                     sessionDelegate.preDeliver(info);
+
+                     // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                     // That means the ref wasn't acked since it couldn't be found.
+                     // In order to maintain at most once semantics we must therefore not return
+                     // the message
+
+                     ignore = !sessionDelegate.postDeliver();
+                     
+                  }
+                  else
+                  {
+                     //JBMESSAGING-1850
+                     Callable<Boolean> afterReceive = new Callable<Boolean>()
+                     {
+                        public Boolean call() throws Exception
+                        {
+                           sessionDelegate.preDeliver(info);
+
+                           // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                           // That means the ref wasn't acked since it couldn't be found.
+                           // In order to maintain at most once semantics we must therefore not return
+                           // the message
+
+                           return !sessionDelegate.postDeliver();
+                        }
+                     };
+
+                     java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+                     long tmUsed = System.currentTimeMillis() - startTimestamp;
+                     
+                     if (tmUsed >= timeout)
+                     {
+                        log.warn("Timed out before post message processing, discarding message " + m);
+                        throw new JMSException("Timed out before post message processing, discarding message " + m);
+                     }
+
+                     try
+                     {
+                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                     }
+                     catch (InterruptedException e)
+                     {
+                        log.warn("Interrupted during getting future result.", e);
+                     }
+                     catch (ExecutionException e)
+                     {
+                        log.warn("received application exception.", e.getCause());
+                        Throwable t = e.getCause();
+                        if (t instanceof JMSException)
+                        {
+                           throw (JMSException)t;
+                        }
+                     }
+                     catch (TimeoutException e)
+                     {
+                        log.warn("Timed out waiting for post message processing, discarding message " + m);
+                        throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+                     }
+                  }
+
                   if (trace)
                   {
-                  	log.trace("Post deliver returned " + !ignore);
+                     log.trace("Post deliver returned " + !ignore);
                   }
-                  
+
                   if (!ignore)
                   {
-                     m.incDeliveryCount();                                
+                     m.incDeliveryCount();
                   }
                }
                                              



More information about the jboss-cvs-commits mailing list