[hornetq-commits] JBoss hornetq SVN: r11147 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/config and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 8 12:04:33 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-08 12:04:32 -0400 (Mon, 08 Aug 2011)
New Revision: 11147

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
more changes to my branch

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -33,7 +33,9 @@
 
    private final int windowSize;
 
-      private boolean blocked;
+   private volatile boolean closed;
+   
+   private boolean blocked;
 
    private final SimpleString address;
 
@@ -68,15 +70,18 @@
 
       if (!semaphore.tryAcquire(credits))
       {
-         this.blocked = true;
-         try
+         if (!closed)
          {
-            semaphore.acquire(credits);
+            this.blocked = true;
+            try
+            {
+               semaphore.acquire(credits);
+            }
+            finally
+            {
+               this.blocked = false;
+            }
          }
-         finally
-         {
-            this.blocked = false;
-         }
       }
    }
 
@@ -118,6 +123,7 @@
    public void close()
    {
       // Closing a producer that is blocking should make it return
+      closed = true;
 
       semaphore.release(Integer.MAX_VALUE / 2);
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -133,6 +133,8 @@
    private final Map<Long, ClientConsumerInternal> consumers = new LinkedHashMap<Long, ClientConsumerInternal>();
 
    private volatile boolean closed;
+   
+   private volatile boolean closing;
 
    private final boolean autoCommitAcks;
 
@@ -857,6 +859,11 @@
          log.debug("Calling close on session "  + this);
       }
 
+      synchronized (this)
+      {
+         closing = true;
+      }
+
       try
       {
          producerCreditManager.close();
@@ -905,7 +912,7 @@
    {
       synchronized (this)
       {
-         if (closed)
+         if (closed/* || closing*/)
          {
             return;
          }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -16,6 +16,7 @@
 import java.io.Serializable;
 import java.util.List;
 
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 
 /**
@@ -48,6 +49,8 @@
    private final long maxRetryInterval;
    
    private final int reconnectAttempts;
+   
+   private final long callTimeout;
 
    private final boolean duplicateDetection;
 
@@ -83,6 +86,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+           HornetQClient.DEFAULT_CALL_TIMEOUT,
            duplicateDetection,
            forwardWhenNoConsumers,
            maxHops,
@@ -101,6 +105,7 @@
                                          final double retryIntervalMultiplier,
                                          final long maxRetryInterval,
                                          final int reconnectAttempts,
+                                         final long callTimeout,
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
@@ -119,6 +124,7 @@
       this.reconnectAttempts = reconnectAttempts;
       this.staticConnectors = staticConnectors;
       this.duplicateDetection = duplicateDetection;
+      this.callTimeout = callTimeout;
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       discoveryGroupName = null;
       this.maxHops = maxHops;
@@ -146,6 +152,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+           HornetQClient.DEFAULT_CALL_TIMEOUT,
            duplicateDetection,
            forwardWhenNoConsumers,
            maxHops,
@@ -163,6 +170,7 @@
                                          final double retryIntervalMultiplier,
                                          final long maxRetryInterval,
                                          final int reconnectAttempts,
+                                         final long callTimeout,
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
@@ -178,6 +186,7 @@
       this.retryIntervalMultiplier = retryIntervalMultiplier;
       this.maxRetryInterval = maxRetryInterval;
       this.reconnectAttempts = reconnectAttempts;
+      this.callTimeout = callTimeout;
       this.duplicateDetection = duplicateDetection;
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       this.discoveryGroupName = discoveryGroupName;
@@ -237,6 +246,11 @@
       return reconnectAttempts;
    }
    
+   public long getCallTimeout()
+   {
+      return callTimeout;
+   }
+   
    public String getConnectorName()
    {
       return connectorName;

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -1015,6 +1015,8 @@
                                                         "retry-interval",
                                                         ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
                                                         Validators.GT_ZERO);
+      
+      long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout", HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
                                                         
       double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier", 
                                                                       ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
@@ -1069,6 +1071,7 @@
                                                      retryIntervalMultiplier,
                                                      maxRetryInterval,
                                                      reconnectAttempts,
+                                                     callTimeout,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,
@@ -1087,6 +1090,7 @@
                                                      retryIntervalMultiplier,
                                                      maxRetryInterval,
                                                      reconnectAttempts,
+                                                     callTimeout,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -95,6 +95,8 @@
    private final long connectionTTL;
 
    private final long retryInterval;
+   
+   private final long callTimeout;
 
    private final double retryIntervalMultiplier;
 
@@ -148,6 +150,7 @@
                                 final double retryIntervalMultiplier,
                                 final long maxRetryInterval,
                                 final int reconnectAttempts,
+                                final long callTimeout,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
@@ -218,6 +221,8 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
 
       this.manager = manager;
+      
+      this.callTimeout = callTimeout;
 
       this.clusterManagerTopology = clusterManagerTopology;
 
@@ -247,6 +252,7 @@
                                 final double retryIntervalMultiplier,
                                 final long maxRetryInterval,
                                 final int reconnectAttempts,
+                                final long callTimeout,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
@@ -287,6 +293,8 @@
       this.maxRetryInterval = maxRetryInterval;
 
       this.reconnectAttempts = reconnectAttempts;
+      
+      this.callTimeout = callTimeout;
 
       this.useDuplicateDetection = useDuplicateDetection;
 
@@ -456,37 +464,26 @@
 
       if (serverLocator != null)
       {
+
+         if (!useDuplicateDetection)
+         {
+            log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
+         }
+
          serverLocator.setNodeID(nodeUUID.toString());
          serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
-
          serverLocator.setReconnectAttempts(0);
-
          serverLocator.setClusterConnection(true);
          serverLocator.setClusterTransportConfiguration(connector);
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
-
          serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
          serverLocator.setConnectionTTL(connectionTTL);
-
-         if (confirmationWindowSize < 0)
-         {
-            // We can't have confirmationSize = -1 on the cluster Bridge
-            // Otherwise we won't have confirmation working
-            serverLocator.setConfirmationWindowSize(0);
-         }
-         else
-         {
-            serverLocator.setConfirmationWindowSize(confirmationWindowSize);
-         }
-
-         if (!useDuplicateDetection)
-         {
-            log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
-         }
+         serverLocator.setConfirmationWindowSize(confirmationWindowSize);
          // if not using duplicate detection, we will send blocked
          serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+         serverLocator.setCallTimeout(callTimeout);
 
          if (retryInterval > 0)
          {
@@ -702,7 +699,7 @@
       targetLocator.setConnectionTTL(connectionTTL);
       targetLocator.setInitialConnectAttempts(0);
 
-      targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+      targetLocator.setConfirmationWindowSize(confirmationWindowSize);
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
       targetLocator.setClusterConnection(true);

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -808,6 +808,7 @@
                                                        config.getRetryIntervalMultiplier(),
                                                        config.getMaxRetryInterval(),
                                                        config.getReconnectAttempts(),
+                                                       config.getCallTimeout(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
                                                        config.getConfirmationWindowSize(),
@@ -830,7 +831,7 @@
          
          if (log.isDebugEnabled())
          {
-            log.debug("XXX " + this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
+            log.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
@@ -845,6 +846,7 @@
                                                        config.getRetryIntervalMultiplier(),
                                                        config.getMaxRetryInterval(),
                                                        config.getReconnectAttempts(),
+                                                       config.getCallTimeout(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
                                                        config.getConfirmationWindowSize(),

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -1883,6 +1883,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            reconnectAttempts,
+           1000,
            true,
            forwardWhenNoConsumers,
            maxHops,

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -14,6 +14,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -29,7 +30,6 @@
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.JMSFactoryType;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
@@ -58,6 +58,8 @@
    private JMSServerManagerImpl serverManager;
 
    private InVMContext initialContext;
+   
+   private AtomicInteger countErrors = new AtomicInteger();
 
    private final String topicName = "my-topic";
 
@@ -162,11 +164,20 @@
                                             "/cf");
    }
 
-   public void testFoo()
+   public void testFlood() throws Throwable
    {
+      try
+      {
+         internalTestFlood();
+      }
+      catch (Throwable e)
+      {
+         log.error(e.getMessage(), e);
+         throw e;
+      }
    }
-
-   public void _testFlood() throws Exception
+   
+   public void internalTestFlood() throws Exception
    {
       ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
 
@@ -174,26 +185,30 @@
 
       final int numConsumers = 20;
 
-      final int numMessages = 10000;
+      final int numMessages = 1000;
 
       ProducerThread[] producers = new ProducerThread[numProducers];
+      
+      Connection conn = cf.createConnection();
 
       for (int i = 0; i < numProducers; i++)
       {
-         producers[i] = new ProducerThread(cf, numMessages);
+         producers[i] = new ProducerThread(conn, numMessages);
       }
 
       ConsumerThread[] consumers = new ConsumerThread[numConsumers];
 
       for (int i = 0; i < numConsumers; i++)
       {
-         consumers[i] = new ConsumerThread(cf, numMessages);
+         consumers[i] = new ConsumerThread(conn, numMessages);
       }
 
       for (int i = 0; i < numConsumers; i++)
       {
          consumers[i].start();
       }
+      
+      conn.start();
 
       for (int i = 0; i < numProducers; i++)
       {
@@ -209,6 +224,10 @@
       {
          producers[i].join();
       }
+      
+      conn.close();
+      
+      assertEquals(0, countErrors.get());
 
    }
 
@@ -216,23 +235,16 @@
    {
       private final Connection connection;
 
-      private final Session session;
+      private Session session;
 
-      private final MessageProducer producer;
+      private MessageProducer producer;
 
       private final int numMessages;
 
-      ProducerThread(final ConnectionFactory cf, final int numMessages) throws Exception
+      ProducerThread(final Connection connection, final int numMessages) throws Exception
       {
-         connection = cf.createConnection();
-
-         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         producer = session.createProducer(HornetQJMSClient.createTopic("my-topic"));
-
-         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
          this.numMessages = numMessages;
+         this.connection = connection;
       }
 
       @Override
@@ -240,6 +252,12 @@
       {
          try
          {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            producer = session.createProducer(HornetQJMSClient.createTopic("my-topic"));
+
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
             byte[] bytes = new byte[1000];
 
             BytesMessage message = session.createBytesMessage();
@@ -255,12 +273,13 @@
                // log.info("Producer " + this + " sent " + i);
                // }
             }
-
-            connection.close();
+            
+            session.close();
          }
-         catch (Exception e)
+         catch (Throwable e)
          {
-            e.printStackTrace();
+            e.printStackTrace(System.out);
+            countErrors.incrementAndGet();
          }
       }
    }
@@ -269,22 +288,16 @@
    {
       private final Connection connection;
 
-      private final Session session;
+      private Session session;
 
-      private final MessageConsumer consumer;
+      private MessageConsumer consumer;
 
       private final int numMessages;
 
-      ConsumerThread(final ConnectionFactory cf, final int numMessages) throws Exception
+      ConsumerThread(final Connection conn, final int numMessages) throws Exception
       {
-         connection = cf.createConnection();
+         this.connection = conn;
 
-         connection.start();
-
-         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         consumer = session.createConsumer(HornetQJMSClient.createTopic("my-topic"));
-
          this.numMessages = numMessages;
       }
 
@@ -293,13 +306,18 @@
       {
          try
          {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            consumer = session.createConsumer(HornetQJMSClient.createTopic("my-topic"));
+
             for (int i = 0; i < numMessages; i++)
             {
-               Message msg = consumer.receive();
+               Message msg = consumer.receive(5000);
 
                if (msg == null)
                {
                   FloodServerTest.log.error("message is null");
+                  countErrors.incrementAndGet();
                   break;
                }
 
@@ -309,11 +327,12 @@
                // }
             }
 
-            connection.close();
+            session.close();
          }
-         catch (Exception e)
+         catch (Throwable e)
          {
-            e.printStackTrace();
+            e.printStackTrace(System.out);
+            countErrors.incrementAndGet();
          }
       }
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-08 15:45:41 UTC (rev 11146)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-08 16:04:32 UTC (rev 11147)
@@ -954,7 +954,7 @@
       
       if (failed)
       {
-         logAndSystemOut("Thread leaged on test " + this.getClass().getName() + "::" + 
+         logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" + 
                          this.getName() + "\n" + buffer.toString());
          fail("Thread leakage");
       }
@@ -1014,6 +1014,19 @@
          fail("invm registry still had acceptors registered");
       }
 
+      long timeout = System.currentTimeMillis() + 10000;
+      
+      while (AsynchronousFileImpl.getTotalMaxIO() != 0 && System.currentTimeMillis() > timeout)
+      {
+         try
+         {
+            Thread.sleep(500);
+         }
+         catch (Exception ignored)
+         {
+         }
+      }
+      
       if (AsynchronousFileImpl.getTotalMaxIO() != 0)
       {
          AsynchronousFileImpl.resetMaxAIO();



More information about the hornetq-commits mailing list