[jboss-cvs] JBoss Messaging SVN: r8282 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851: src/main/org/jboss/jms/client/container and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 28 14:29:12 EDT 2011


Author: jbertram
Date: 2011-04-28 14:29:11 -0400 (Thu, 28 Apr 2011)
New Revision: 8282

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
Log:
JBPAPP-6159


Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:8238
   + /branches/Branch_1_4:8237-8238,8245,8257

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-04-28 18:29:11 UTC (rev 8282)
@@ -24,7 +24,13 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -289,6 +295,7 @@
    private int consumeCount;
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
+   private ExecutorService pool = Executors.newCachedThreadPool();
    private long maxRetryChangeRate;
    private long retryChangeRateInterval;
 
@@ -464,6 +471,8 @@
          
          this.listener = null;
       }
+      
+      pool.shutdownNow();
                            
       if (trace) { log.trace(this + " closed"); }
    }
@@ -551,25 +560,80 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-                                                    
-                  sessionDelegate.preDeliver(info);                  
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
                   
-                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
-                  //That means the ref wasn't acked since it couldn't be found.
-                  //In order to maintain at most once semantics we must therefore not return
-                  //the message
-                  
-                  ignore = !sessionDelegate.postDeliver();  
-                  
+                  if (timeout <= 0)
+                  {
+                     sessionDelegate.preDeliver(info);
+
+                     // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                     // That means the ref wasn't acked since it couldn't be found.
+                     // In order to maintain at most once semantics we must therefore not return
+                     // the message
+
+                     ignore = !sessionDelegate.postDeliver();
+                     
+                  }
+                  else
+                  {
+                     //JBMESSAGING-1850
+                     Callable<Boolean> afterReceive = new Callable<Boolean>()
+                     {
+                        public Boolean call() throws Exception
+                        {
+                           sessionDelegate.preDeliver(info);
+
+                           // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                           // That means the ref wasn't acked since it couldn't be found.
+                           // In order to maintain at most once semantics we must therefore not return
+                           // the message
+
+                           return !sessionDelegate.postDeliver();
+                        }
+                     };
+
+                     java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+                     long tmUsed = System.currentTimeMillis() - startTimestamp;
+                     
+                     if (tmUsed >= timeout)
+                     {
+                        log.warn("Timed out before post message processing, discarding message " + m);
+                        throw new JMSException("Timed out before post message processing, discarding message " + m);
+                     }
+
+                     try
+                     {
+                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                     }
+                     catch (InterruptedException e)
+                     {
+                        log.warn("Interrupted during getting future result.", e);
+                     }
+                     catch (ExecutionException e)
+                     {
+                        log.warn("received application exception.", e.getCause());
+                        Throwable t = e.getCause();
+                        if (t instanceof JMSException)
+                        {
+                           throw (JMSException)t;
+                        }
+                     }
+                     catch (TimeoutException e)
+                     {
+                        log.warn("Timed out waiting for post message processing, discarding message " + m);
+                        throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+                     }
+                  }
+
                   if (trace)
                   {
-                  	log.trace("Post deliver returned " + !ignore);
+                     log.trace("Post deliver returned " + !ignore);
                   }
-                  
+
                   if (!ignore)
                   {
-                     m.incDeliveryCount();                                
+                     m.incDeliveryCount();
                   }
                }
                                              

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2011-04-28 18:29:11 UTC (rev 8282)
@@ -383,6 +383,10 @@
       strictTck = in.readBoolean();
       
       sendAcksAsync = in.readBoolean();
+      
+      maxRetryChangeRate = in.readLong();
+      
+      retryChangeRateInterval = in.readLong();
    }
 
    public void write(DataOutputStream out) throws Exception
@@ -400,6 +404,10 @@
       out.writeBoolean(strictTck);
       
       out.writeBoolean(sendAcksAsync);
+      
+      out.writeLong(this.maxRetryChangeRate);
+      
+      out.writeLong(this.retryChangeRateInterval);
    }
 
    /**

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2011-04-28 18:29:11 UTC (rev 8282)
@@ -23,10 +23,15 @@
 package org.jboss.test.messaging.jms.clustering;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.management.ObjectName;
 
+import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
@@ -217,6 +222,181 @@
       
    }
 
+   //https://issues.jboss.org/browse/JBMESSAGING-1851
+   public void testChangeRateConfigSettings() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestChangeRateConfigSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">true</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + "       <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+                              + "       <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long maxRetry = state1.getMaxRetryChangeRate();
+         
+         assertEquals(10, maxRetry);
+         
+         long retryInterval = state1.getRetryChangeRateInterval();
+         
+         assertEquals(2345, retryInterval);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1851
+   public void testChangeRateConfigSettings2() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestChangeRateConfigSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">false</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + "       <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+                              + "       <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long maxRetry = state1.getMaxRetryChangeRate();
+         
+         assertEquals(10, maxRetry);
+         
+         long retryInterval = state1.getRetryChangeRateInterval();
+         
+         assertEquals(2345, retryInterval);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1851
+   public void testChangeRateConfigDefaults() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigDefaultsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestChangeRateConfigDefaultsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">true</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigDefaultsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long maxRetry = state1.getMaxRetryChangeRate();
+         
+         assertEquals(0, maxRetry);
+         
+         long retryInterval = state1.getRetryChangeRateInterval();
+         
+         assertEquals(5000, retryInterval);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------



More information about the jboss-cvs-commits mailing list