Author: clebert.suconic(a)jboss.com
Date: 2011-08-08 22:57:54 -0400 (Mon, 08 Aug 2011)
New Revision: 11154
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Several issues on this commit:
https://issues.jboss.org/browse/HORNETQ-711,
https://issues.jboss.org/browse/HORNETQ-716,
https://issues.jboss.org/browse/HORNETQ-743
(should indirectly fix JBPAPP-6522)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -33,7 +33,9 @@
private final int windowSize;
- private boolean blocked;
+ private volatile boolean closed;
+
+ private boolean blocked;
private final SimpleString address;
@@ -68,15 +70,18 @@
if (!semaphore.tryAcquire(credits))
{
- this.blocked = true;
- try
+ if (!closed)
{
- semaphore.acquire(credits);
+ this.blocked = true;
+ try
+ {
+ semaphore.acquire(credits);
+ }
+ finally
+ {
+ this.blocked = false;
+ }
}
- finally
- {
- this.blocked = false;
- }
}
}
@@ -118,6 +123,7 @@
public void close()
{
// Closing a producer that is blocking should make it return
+ closed = true;
semaphore.release(Integer.MAX_VALUE / 2);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -133,6 +133,8 @@
private final Map<Long, ClientConsumerInternal> consumers = new
LinkedHashMap<Long, ClientConsumerInternal>();
private volatile boolean closed;
+
+ private volatile boolean closing;
private final boolean autoCommitAcks;
@@ -857,6 +859,11 @@
log.debug("Calling close on session " + this);
}
+ synchronized (this)
+ {
+ closing = true;
+ }
+
try
{
producerCreditManager.close();
@@ -905,7 +912,7 @@
{
synchronized (this)
{
- if (closed)
+ if (closed/* || closing*/)
{
return;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -16,6 +16,7 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.impl.ConfigurationImpl;
/**
@@ -48,6 +49,8 @@
private final long maxRetryInterval;
private final int reconnectAttempts;
+
+ private final long callTimeout;
private final boolean duplicateDetection;
@@ -83,6 +86,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -101,6 +105,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -119,6 +124,7 @@
this.reconnectAttempts = reconnectAttempts;
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
+ this.callTimeout = callTimeout;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
discoveryGroupName = null;
this.maxHops = maxHops;
@@ -146,6 +152,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -163,6 +170,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -178,6 +186,7 @@
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
+ this.callTimeout = callTimeout;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
@@ -237,6 +246,11 @@
return reconnectAttempts;
}
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
public String getConnectorName()
{
return connectorName;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -1015,6 +1015,8 @@
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+ long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout",
HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e,
"retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
@@ -1069,6 +1071,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ callTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1087,6 +1090,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ callTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -1522,10 +1522,16 @@
{
checkStarted();
- if (pageSizeBytes >= maxSizeBytes)
+ // JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes
+ if (pageSizeBytes > maxSizeBytes && maxSizeBytes > 0)
{
throw new IllegalStateException("pageSize has to be lower than
maxSizeBytes. Invalid argument (" + pageSizeBytes + " < " + maxSizeBytes
+ ")");
}
+
+ if (maxSizeBytes < -1 )
+ {
+ throw new IllegalStateException("Invalid argument on maxSizeBytes");
+ }
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(DLA == null ? null : new SimpleString(DLA));
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -33,6 +33,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorImpl;
@@ -94,6 +95,8 @@
private final long connectionTTL;
private final long retryInterval;
+
+ private final long callTimeout;
private final double retryIntervalMultiplier;
@@ -104,6 +107,8 @@
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
+
+ private final int confirmationWindowSize;
private final Map<String, MessageFlowRecord> records = new
ConcurrentHashMap<String, MessageFlowRecord>();
@@ -145,6 +150,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -189,6 +195,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -213,6 +221,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
+
+ this.callTimeout = callTimeout;
this.clusterManagerTopology = clusterManagerTopology;
@@ -242,6 +252,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -282,11 +293,15 @@
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
+
+ this.callTimeout = callTimeout;
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+ this.confirmationWindowSize = confirmationWindowSize;
+
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
@@ -449,33 +464,26 @@
if (serverLocator != null)
{
+
+ if (!useDuplicateDetection)
+ {
+ log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
+ }
+
serverLocator.setNodeID(nodeUUID.toString());
serverLocator.setIdentity("(main-ClusterConnection::" +
server.toString() + ")");
-
serverLocator.setReconnectAttempts(0);
-
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
-
serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
serverLocator.setConnectionTTL(connectionTTL);
-
- if (serverLocator.getConfirmationWindowSize() < 0)
- {
- // We can't have confirmationSize = -1 on the cluster Bridge
- // Otherwise we won't have confirmation working
- serverLocator.setConfirmationWindowSize(0);
- }
-
- if (!useDuplicateDetection)
- {
- log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
- }
+ serverLocator.setConfirmationWindowSize(confirmationWindowSize);
// if not using duplicate detection, we will send blocked
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+ serverLocator.setCallTimeout(callTimeout);
if (retryInterval > 0)
{
@@ -690,7 +698,7 @@
targetLocator.setConnectionTTL(connectionTTL);
targetLocator.setInitialConnectAttempts(0);
-
targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+ targetLocator.setConfirmationWindowSize(confirmationWindowSize);
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -808,6 +808,7 @@
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
+ config.getCallTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
@@ -845,6 +846,7 @@
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
+ config.getCallTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -1880,6 +1880,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
reconnectAttempts,
+ 1000,
true,
forwardWhenNoConsumers,
maxHops,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2011-08-09
02:27:09 UTC (rev 11153)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -522,8 +522,7 @@
}
assertTrue("Exception expected", ex);
-
- //restartServer();
+ //restartServer();
serverControl = createManagementControl();
String jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
@@ -540,6 +539,63 @@
assertEquals(redistributionDelay, info.getRedistributionDelay());
assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy());
+
+ serverControl.addAddressSettings(addressMatch,
+ DLA,
+ expiryAddress,
+ lastValueQueue,
+ deliveryAttempts,
+ -1,
+ 1000,
+ pageMaxCacheSize,
+ redeliveryDelay,
+ redistributionDelay,
+ sendToDLAOnNoRoute,
+ addressFullMessagePolicy);
+
+
+ jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
+ info = AddressSettingsInfo.from(jsonString);
+
+ assertEquals(DLA, info.getDeadLetterAddress());
+ assertEquals(expiryAddress, info.getExpiryAddress());
+ assertEquals(lastValueQueue, info.isLastValueQueue());
+ assertEquals(deliveryAttempts, info.getMaxDeliveryAttempts());
+ assertEquals(-1, info.getMaxSizeBytes());
+ assertEquals(pageMaxCacheSize, info.getPageCacheMaxSize());
+ assertEquals(1000, info.getPageSizeBytes());
+ assertEquals(redeliveryDelay, info.getRedeliveryDelay());
+ assertEquals(redistributionDelay, info.getRedistributionDelay());
+ assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
+ assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy());
+
+
+ ex = false;
+ try
+ {
+ serverControl.addAddressSettings(addressMatch,
+ DLA,
+ expiryAddress,
+ lastValueQueue,
+ deliveryAttempts,
+ -2,
+ 1000,
+ pageMaxCacheSize,
+ redeliveryDelay,
+ redistributionDelay,
+ sendToDLAOnNoRoute,
+ addressFullMessagePolicy);
+ }
+ catch (Exception e)
+ {
+ ex = true;
+ }
+
+
+ assertTrue("Supposed to have an exception called", ex);
+
+
+
}
public void testCreateAndDestroyDivert() throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-09
02:27:09 UTC (rev 11153)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-09
02:57:54 UTC (rev 11154)
@@ -954,7 +954,7 @@
if (failed)
{
- logAndSystemOut("Thread leaged on test " + this.getClass().getName() +
"::" +
+ logAndSystemOut("Thread leaked on test " + this.getClass().getName() +
"::" +
this.getName() + "\n" + buffer.toString());
fail("Thread leakage");
}
@@ -1014,6 +1014,19 @@
fail("invm registry still had acceptors registered");
}
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (AsynchronousFileImpl.getTotalMaxIO() != 0 &&
System.currentTimeMillis() > timeout)
+ {
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
if (AsynchronousFileImpl.getTotalMaxIO() != 0)
{
AsynchronousFileImpl.resetMaxAIO();