[hornetq-commits] JBoss hornetq SVN: r11154 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/config and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 8 22:57:55 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-08 22:57:54 -0400 (Mon, 08 Aug 2011)
New Revision: 11154

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Several issues on this commit: https://issues.jboss.org/browse/HORNETQ-711, https://issues.jboss.org/browse/HORNETQ-716, https://issues.jboss.org/browse/HORNETQ-743 (should indirectly fix JBPAPP-6522)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -33,7 +33,9 @@
 
    private final int windowSize;
 
-      private boolean blocked;
+   private volatile boolean closed;
+   
+   private boolean blocked;
 
    private final SimpleString address;
 
@@ -68,15 +70,18 @@
 
       if (!semaphore.tryAcquire(credits))
       {
-         this.blocked = true;
-         try
+         if (!closed)
          {
-            semaphore.acquire(credits);
+            this.blocked = true;
+            try
+            {
+               semaphore.acquire(credits);
+            }
+            finally
+            {
+               this.blocked = false;
+            }
          }
-         finally
-         {
-            this.blocked = false;
-         }
       }
    }
 
@@ -118,6 +123,7 @@
    public void close()
    {
       // Closing a producer that is blocking should make it return
+      closed = true;
 
       semaphore.release(Integer.MAX_VALUE / 2);
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -133,6 +133,8 @@
    private final Map<Long, ClientConsumerInternal> consumers = new LinkedHashMap<Long, ClientConsumerInternal>();
 
    private volatile boolean closed;
+   
+   private volatile boolean closing;
 
    private final boolean autoCommitAcks;
 
@@ -857,6 +859,11 @@
          log.debug("Calling close on session "  + this);
       }
 
+      synchronized (this)
+      {
+         closing = true;
+      }
+
       try
       {
          producerCreditManager.close();
@@ -905,7 +912,7 @@
    {
       synchronized (this)
       {
-         if (closed)
+         if (closed/* || closing*/)
          {
             return;
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -16,6 +16,7 @@
 import java.io.Serializable;
 import java.util.List;
 
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 
 /**
@@ -48,6 +49,8 @@
    private final long maxRetryInterval;
    
    private final int reconnectAttempts;
+   
+   private final long callTimeout;
 
    private final boolean duplicateDetection;
 
@@ -83,6 +86,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+           HornetQClient.DEFAULT_CALL_TIMEOUT,
            duplicateDetection,
            forwardWhenNoConsumers,
            maxHops,
@@ -101,6 +105,7 @@
                                          final double retryIntervalMultiplier,
                                          final long maxRetryInterval,
                                          final int reconnectAttempts,
+                                         final long callTimeout,
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
@@ -119,6 +124,7 @@
       this.reconnectAttempts = reconnectAttempts;
       this.staticConnectors = staticConnectors;
       this.duplicateDetection = duplicateDetection;
+      this.callTimeout = callTimeout;
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       discoveryGroupName = null;
       this.maxHops = maxHops;
@@ -146,6 +152,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+           HornetQClient.DEFAULT_CALL_TIMEOUT,
            duplicateDetection,
            forwardWhenNoConsumers,
            maxHops,
@@ -163,6 +170,7 @@
                                          final double retryIntervalMultiplier,
                                          final long maxRetryInterval,
                                          final int reconnectAttempts,
+                                         final long callTimeout,
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
@@ -178,6 +186,7 @@
       this.retryIntervalMultiplier = retryIntervalMultiplier;
       this.maxRetryInterval = maxRetryInterval;
       this.reconnectAttempts = reconnectAttempts;
+      this.callTimeout = callTimeout;
       this.duplicateDetection = duplicateDetection;
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       this.discoveryGroupName = discoveryGroupName;
@@ -237,6 +246,11 @@
       return reconnectAttempts;
    }
    
+   public long getCallTimeout()
+   {
+      return callTimeout;
+   }
+   
    public String getConnectorName()
    {
       return connectorName;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -1015,6 +1015,8 @@
                                                         "retry-interval",
                                                         ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
                                                         Validators.GT_ZERO);
+      
+      long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout", HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
                                                         
       double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier", 
                                                                       ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
@@ -1069,6 +1071,7 @@
                                                      retryIntervalMultiplier,
                                                      maxRetryInterval,
                                                      reconnectAttempts,
+                                                     callTimeout,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,
@@ -1087,6 +1090,7 @@
                                                      retryIntervalMultiplier,
                                                      maxRetryInterval,
                                                      reconnectAttempts,
+                                                     callTimeout,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -1522,10 +1522,16 @@
    {
       checkStarted();
 
-      if (pageSizeBytes >= maxSizeBytes)
+      // JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes
+      if (pageSizeBytes > maxSizeBytes && maxSizeBytes > 0)
       {
          throw new IllegalStateException("pageSize has to be lower than maxSizeBytes. Invalid argument (" + pageSizeBytes + " < " + maxSizeBytes + ")");
       }
+      
+      if (maxSizeBytes < -1 )
+      {
+    	  throw new IllegalStateException("Invalid argument on maxSizeBytes");
+      }
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setDeadLetterAddress(DLA == null ? null : new SimpleString(DLA));

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -33,6 +33,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
@@ -94,6 +95,8 @@
    private final long connectionTTL;
 
    private final long retryInterval;
+   
+   private final long callTimeout;
 
    private final double retryIntervalMultiplier;
 
@@ -104,6 +107,8 @@
    private final boolean useDuplicateDetection;
 
    private final boolean routeWhenNoConsumers;
+   
+   private final int confirmationWindowSize;
 
    private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
 
@@ -145,6 +150,7 @@
                                 final double retryIntervalMultiplier,
                                 final long maxRetryInterval,
                                 final int reconnectAttempts,
+                                final long callTimeout,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
@@ -189,6 +195,8 @@
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
+      
+      this.confirmationWindowSize = confirmationWindowSize;
 
       this.executorFactory = executorFactory;
 
@@ -213,6 +221,8 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
 
       this.manager = manager;
+      
+      this.callTimeout = callTimeout;
 
       this.clusterManagerTopology = clusterManagerTopology;
 
@@ -242,6 +252,7 @@
                                 final double retryIntervalMultiplier,
                                 final long maxRetryInterval,
                                 final int reconnectAttempts,
+                                final long callTimeout,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
@@ -282,11 +293,15 @@
       this.maxRetryInterval = maxRetryInterval;
 
       this.reconnectAttempts = reconnectAttempts;
+      
+      this.callTimeout = callTimeout;
 
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
 
+      this.confirmationWindowSize = confirmationWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.executor = executorFactory.getExecutor();
@@ -449,33 +464,26 @@
 
       if (serverLocator != null)
       {
+
+         if (!useDuplicateDetection)
+         {
+            log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
+         }
+
          serverLocator.setNodeID(nodeUUID.toString());
          serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
-
          serverLocator.setReconnectAttempts(0);
-
          serverLocator.setClusterConnection(true);
          serverLocator.setClusterTransportConfiguration(connector);
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
-
          serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
          serverLocator.setConnectionTTL(connectionTTL);
-
-         if (serverLocator.getConfirmationWindowSize() < 0)
-         {
-            // We can't have confirmationSize = -1 on the cluster Bridge
-            // Otherwise we won't have confirmation working
-            serverLocator.setConfirmationWindowSize(0);
-         }
-
-         if (!useDuplicateDetection)
-         {
-            log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
-         }
+         serverLocator.setConfirmationWindowSize(confirmationWindowSize);
          // if not using duplicate detection, we will send blocked
          serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+         serverLocator.setCallTimeout(callTimeout);
 
          if (retryInterval > 0)
          {
@@ -690,7 +698,7 @@
       targetLocator.setConnectionTTL(connectionTTL);
       targetLocator.setInitialConnectAttempts(0);
 
-      targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+      targetLocator.setConfirmationWindowSize(confirmationWindowSize);
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
       targetLocator.setClusterConnection(true);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -808,6 +808,7 @@
                                                        config.getRetryIntervalMultiplier(),
                                                        config.getMaxRetryInterval(),
                                                        config.getReconnectAttempts(),
+                                                       config.getCallTimeout(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
                                                        config.getConfirmationWindowSize(),
@@ -845,6 +846,7 @@
                                                        config.getRetryIntervalMultiplier(),
                                                        config.getMaxRetryInterval(),
                                                        config.getReconnectAttempts(),
+                                                       config.getCallTimeout(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
                                                        config.getConfirmationWindowSize(),

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -1880,6 +1880,7 @@
            ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
            ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
            reconnectAttempts,
+           1000,
            true,
            forwardWhenNoConsumers,
            maxHops,

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -522,8 +522,7 @@
       }
       
       assertTrue("Exception expected", ex);
-
-      //restartServer();
+       //restartServer();
       serverControl = createManagementControl();
 
       String jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
@@ -540,6 +539,63 @@
       assertEquals(redistributionDelay, info.getRedistributionDelay());
       assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
       assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy());
+      
+      serverControl.addAddressSettings(addressMatch,
+                                       DLA,
+                                       expiryAddress,
+                                       lastValueQueue,
+                                       deliveryAttempts,
+                                       -1,
+                                       1000,
+                                       pageMaxCacheSize,
+                                       redeliveryDelay,
+                                       redistributionDelay,
+                                       sendToDLAOnNoRoute,
+                                       addressFullMessagePolicy);
+      
+
+      jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
+      info = AddressSettingsInfo.from(jsonString);
+      
+      assertEquals(DLA, info.getDeadLetterAddress());
+      assertEquals(expiryAddress, info.getExpiryAddress());
+      assertEquals(lastValueQueue, info.isLastValueQueue());
+      assertEquals(deliveryAttempts, info.getMaxDeliveryAttempts());
+      assertEquals(-1, info.getMaxSizeBytes());
+      assertEquals(pageMaxCacheSize, info.getPageCacheMaxSize());
+      assertEquals(1000, info.getPageSizeBytes());
+      assertEquals(redeliveryDelay, info.getRedeliveryDelay());
+      assertEquals(redistributionDelay, info.getRedistributionDelay());
+      assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
+      assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy());
+      
+      
+      ex = false;
+      try
+      {
+         serverControl.addAddressSettings(addressMatch,
+                                          DLA,
+                                          expiryAddress,
+                                          lastValueQueue,
+                                          deliveryAttempts,
+                                          -2,
+                                          1000,
+                                          pageMaxCacheSize,
+                                          redeliveryDelay,
+                                          redistributionDelay,
+                                          sendToDLAOnNoRoute,
+                                          addressFullMessagePolicy);
+      }
+      catch (Exception e)
+      {
+         ex = true;
+      }
+      
+      
+      assertTrue("Supposed to have an exception called", ex);
+
+
+
    }
 
    public void testCreateAndDestroyDivert() throws Exception

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-09 02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-09 02:57:54 UTC (rev 11154)
@@ -954,7 +954,7 @@
       
       if (failed)
       {
-         logAndSystemOut("Thread leaged on test " + this.getClass().getName() + "::" + 
+         logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" + 
                          this.getName() + "\n" + buffer.toString());
          fail("Thread leakage");
       }
@@ -1014,6 +1014,19 @@
          fail("invm registry still had acceptors registered");
       }
 
+      long timeout = System.currentTimeMillis() + 10000;
+      
+      while (AsynchronousFileImpl.getTotalMaxIO() != 0 && System.currentTimeMillis() > timeout)
+      {
+         try
+         {
+            Thread.sleep(500);
+         }
+         catch (Exception ignored)
+         {
+         }
+      }
+      
       if (AsynchronousFileImpl.getTotalMaxIO() != 0)
       {
          AsynchronousFileImpl.resetMaxAIO();



More information about the hornetq-commits mailing list