Author: clebert.suconic(a)jboss.com
Date: 2011-06-15 11:15:58 -0400 (Wed, 15 Jun 2011)
New Revision: 10811
Modified:
branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
HORNETQ-716 - Cleanup on clustering
Modified: branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-06-15
14:15:02 UTC (rev 10810)
+++ branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-06-15
15:15:58 UTC (rev 10811)
@@ -313,12 +313,18 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="transformer-class-name" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="connection-ttl" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="retry-interval" type="xsd:long">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="retry-interval-multiplier" type="xsd:double">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0"
name="reconnect-attempts" type="xsd:int">
- </xsd:element>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="max-retry-interval" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="failover-on-server-shutdown" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="use-duplicate-detection" type="xsd:boolean">
@@ -354,8 +360,18 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="1"
name="connector-ref" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="connection-ttl" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="retry-interval" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="retry-interval-multiplier" type="xsd:double">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="max-retry-interval" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="use-duplicate-detection" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="forward-when-no-consumers" type="xsd:boolean">
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -137,4 +137,6 @@
ServerLocator getServerLocator();
CoreRemotingConnection getConnection();
+
+ boolean isClosed();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -83,6 +83,8 @@
private static final long serialVersionUID = 2512460695662741413L;
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Attributes
//
-----------------------------------------------------------------------------------
@@ -404,10 +406,10 @@
public void causeExit()
{
- exitLoop = true;
synchronized (waitLock)
{
- waitLock.notify();
+ exitLoop = true;
+ waitLock.notifyAll();
}
}
@@ -418,7 +420,7 @@
return;
}
- // we need to stopthe factory from connecting if it is in the middle aof trying to
failover before we get the lock
+ // we need to stop the factory from connecting if it is in the middle of trying to
failover before we get the lock
causeExit();
synchronized (createSessionLock)
{
@@ -449,7 +451,12 @@
closed = true;
}
- public ServerLocator getServerLocator()
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
+ public ServerLocator getServerLocator()
{
return serverLocator;
}
@@ -497,6 +504,12 @@
return;
}
+
+ if (isTrace)
+ {
+ log.trace("Client Connection failed, calling failure listeners and
trying to reconnect, reconnectAttempts=" + reconnectAttempts);
+ }
+
// We call before reconnection occurs to give the user a chance to do cleanup,
like cancel messages
callFailureListeners(me, false, false);
@@ -881,13 +894,8 @@
synchronized (waitLock)
{
- while (true)
+ while (!exitLoop)
{
- if (exitLoop)
- {
- return;
- }
-
if (log.isDebugEnabled())
{
log.debug("Trying reconnection attempt " + count);
@@ -910,14 +918,21 @@
return;
}
+ if (isTrace)
+ {
+ log.trace("Waiting " + interval +
+ " milliseconds before next retry.
RetryInterval=" + retryInterval +
+ " and multiplier = " +
retryIntervalMultiplier);
+ }
+
try
{
- waitLock.wait(interval);
+ waitLock.wait(interval);
}
catch (InterruptedException ignore)
{
}
-
+
// Exponential back-off
long newInterval = (long)(interval * retryIntervalMultiplier);
@@ -1086,6 +1101,13 @@
}
}
}
+ else
+ {
+ if (isTrace)
+ {
+ log.trace("No Backup configured!");
+ }
+ }
}
catch (Exception e)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -843,8 +843,11 @@
{
if (closed)
{
+ log.debug("Session was already closed, giving up now, this=" + this);
return;
}
+
+ log.debug("Calling close on session " + this);
try
{
@@ -1607,6 +1610,10 @@
return remotingConnection;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
@@ -1614,7 +1621,8 @@
{
buffer.append(entry.getKey() + "=" + entry.getValue() +
",");
}
- return "ClientSessionImpl::(" + buffer.toString() + ")";
+
+ return "ClientSessionImpl [name=" + name + ", username=" +
username + ", closed=" + closed + " metaData=(" + buffer +
")]@" + Integer.toHexString(hashCode()) ;
}
// Protected
@@ -1776,12 +1784,21 @@
private void doCleanup(boolean failingOver)
{
remotingConnection.removeFailureListener(this);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("calling cleanup on " + this);
+ }
synchronized (this)
{
closed = true;
channel.close();
+
+ // if the server is sending a disconnect
+ // any pending blocked operation could hang without this
+ channel.returnBlocking();
}
sessionFactory.removeSession(this, failingOver);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -578,4 +578,10 @@
{
return session.isCompressLargeMessages();
}
+
+ public String toString()
+ {
+ return "DelegatingSession [session=" + session + "]";
+ }
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -49,6 +49,8 @@
public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
{
private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private static final int SOCKET_TIMEOUT = 500;
@@ -375,6 +377,14 @@
if (changed)
{
+ if (isTrace)
+ {
+ log.trace("Connectors changed on Discovery:");
+ for (DiscoveryEntry connector : connectors.values())
+ {
+ log.trace(connector);
+ }
+ }
callListeners();
}
@@ -438,6 +448,10 @@
if (entry.getValue().getLastUpdate() + timeout <= now)
{
+ if (isTrace)
+ {
+ log.trace("Timed out node on discovery:" + entry.getValue());
+ }
iter.remove();
changed = true;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -61,17 +61,24 @@
private String password;
+ private final long connectionTTL;
+
+ private final long maxRetryInterval;
+
+
public BridgeConfiguration(final String name,
final String queueName,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final long maxRetryInterval,
final double retryIntervalMultiplier,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
- final long clientFailureCheckPeriod,
final List<String> staticConnectors,
final boolean ha,
final String user,
@@ -89,8 +96,10 @@
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.staticConnectors = staticConnectors;
- this.user = user;
+ this. user = user;
this.password = password;
+ this.connectionTTL = connectionTTL;
+ this.maxRetryInterval = maxRetryInterval;
discoveryGroupName = null;
}
@@ -99,12 +108,14 @@
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final long maxRetryInterval,
final double retryIntervalMultiplier,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
- final long clientFailureCheckPeriod,
final String discoveryGroupName,
final boolean ha,
final String user,
@@ -126,6 +137,8 @@
this.ha = ha;
this.user = user;
this.password = password;
+ this.connectionTTL = connectionTTL;
+ this.maxRetryInterval = maxRetryInterval;
}
public String getName()
@@ -138,6 +151,22 @@
return queueName;
}
+ /**
+ * @return the connectionTTL
+ */
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ /**
+ * @return the maxRetryInterval
+ */
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
public String getForwardingAddress()
{
return forwardingAddress;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -16,6 +16,8 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+
/**
* A ClusterConnectionConfiguration
*
@@ -35,7 +37,17 @@
private final String connectorName;
+ private final long clientFailureCheckPeriod;
+
+ private final long connectionTTL;
+
private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final long maxRetryInterval;
+
+ private final int reconnectAttempts;
private final boolean duplicateDetection;
@@ -50,11 +62,45 @@
private final int confirmationWindowSize;
private final boolean allowDirectConnectionsOnly;
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final String connectorName,
+ final long retryInterval,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int confirmationWindowSize,
+ final List<String> staticConnectors,
+ final boolean allowDirectConnectionsOnly)
+ {
+ this(name,
+ address,
+ connectorName,
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ staticConnectors,
+ allowDirectConnectionsOnly);
+ }
+
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -65,7 +111,12 @@
this.name = name;
this.address = address;
this.connectorName = connectorName;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.connectionTTL = connectionTTL;
this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetryInterval = maxRetryInterval;
+ this.reconnectAttempts = reconnectAttempts;
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
@@ -75,6 +126,7 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
}
+
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
@@ -85,10 +137,47 @@
final int confirmationWindowSize,
final String discoveryGroupName)
{
+ this(name,
+ address,
+ connectorName,
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ discoveryGroupName);
+ }
+
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final String connectorName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int confirmationWindowSize,
+ final String discoveryGroupName)
+ {
this.name = name;
this.address = address;
this.connectorName = connectorName;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.connectionTTL = connectionTTL;
this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetryInterval = maxRetryInterval;
+ this.reconnectAttempts = reconnectAttempts;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
@@ -108,6 +197,46 @@
return address;
}
+ /**
+ * @return the clientFailureCheckPeriod
+ */
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ /**
+ * @return the connectionTTL
+ */
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ /**
+ * @return the retryIntervalMultiplier
+ */
+ public double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ /**
+ * @return the maxRetryInterval
+ */
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ /**
+ * @return the reconnectAttempts
+ */
+ public int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
public String getConnectorName()
{
return connectorName;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -166,6 +167,16 @@
public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
+ public static final int DEFAULT_CLUSTER_RECONNECT_ATTEMPTS = -1;
+
+ public static final long DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD =
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ public static final long DEFAULT_CLUSTER_CONNECTION_TTL =
HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ public static final double DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER =
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ public static final long DEFAULT_CLUSTER_MAX_RETRY_INTERVAL =
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
public static final boolean DEFAULT_BRIDGE_DUPLICATE_DETECTION = true;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -1003,12 +1003,27 @@
"max-hops",
ConfigurationImpl.DEFAULT_CLUSTER_MAX_HOPS,
Validators.GE_ZERO);
+
+ long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e,
"check-period",
+
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
+ long connectionTTL = XMLConfigurationUtil.getLong(e, "connection-ttl",
+
ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL, Validators.GT_ZERO) ;
+
+
long retryInterval = XMLConfigurationUtil.getLong(e,
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+ double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e,
"retry-interval-multiplier",
+
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
+
+ long maxRetryInterval = XMLConfigurationUtil.getLong(e,
"max-retry-interval", ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+ int reconnectAttempts = XMLConfigurationUtil.getInteger(e,
"reconnect-attempts", ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
Validators.MINUS_ONE_OR_GE_ZERO);
+
int confirmationWindowSize = XMLConfigurationUtil.getInteger(e,
"confirmation-window-size",
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -1048,7 +1063,12 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1061,7 +1081,12 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1101,17 +1126,26 @@
null,
Validators.NO_CHECK);
+ // Default bridge conf
+ int confirmationWindowSize = XMLConfigurationUtil.getInteger(brNode,
+
"confirmation-window-size",
+
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ Validators.GT_ZERO);
+
long retryInterval = XMLConfigurationUtil.getLong(brNode,
"retry-interval",
HornetQClient.DEFAULT_RETRY_INTERVAL,
Validators.GT_ZERO);
- // Default bridge conf
- int confirmationWindowSize = XMLConfigurationUtil.getInteger(brNode,
-
"confirmation-window-size",
-
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- Validators.GT_ZERO);
+ long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(brNode,
"check-period",
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
+ long connectionTTL = XMLConfigurationUtil.getLong(brNode,
"connection-ttl",
+
HornetQClient.DEFAULT_CONNECTION_TTL, Validators.GT_ZERO) ;
+
+ long maxRetryInterval = XMLConfigurationUtil.getLong(brNode,
"max-retry-interval", HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(brNode,
"retry-interval-multiplier",
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
@@ -1173,12 +1207,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ maxRetryInterval,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectorNames,
ha,
user,
@@ -1191,12 +1227,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ maxRetryInterval,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
discoveryGroupName,
ha,
user,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -241,6 +241,7 @@
try
{
bridge.stop();
+ bridge.flushExecutor();
}
finally
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -38,6 +38,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.BridgeControl;
import org.hornetq.api.core.management.DivertControl;
@@ -1702,12 +1703,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- clientFailureCheckPeriod,
connectorNames,
ha,
user,
@@ -1721,12 +1724,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- clientFailureCheckPeriod,
connectors,
ha,
user,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -13,6 +13,8 @@
package org.hornetq.core.postoffice.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
@@ -50,6 +52,8 @@
{
private static final Logger log = Logger.getLogger(BindingsImpl.class);
+ private static boolean isTrace = log.isTraceEnabled();
+
private final ConcurrentMap<SimpleString, List<Binding>>
routingNameBindingMap = new ConcurrentHashMap<SimpleString, List<Binding>>();
private final Map<SimpleString, Integer> routingNamePositions = new
ConcurrentHashMap<SimpleString, Integer>();
@@ -61,13 +65,16 @@
private volatile boolean routeWhenNoConsumers;
private final GroupingHandler groupingHandler;
-
+
private final PagingStore pageStore;
- public BindingsImpl(final GroupingHandler groupingHandler, final PagingStore
pageStore)
+ private final SimpleString name;
+
+ public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler,
final PagingStore pageStore)
{
this.groupingHandler = groupingHandler;
this.pageStore = pageStore;
+ this.name = name;
}
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
@@ -82,6 +89,10 @@
public void addBinding(final Binding binding)
{
+ if (isTrace)
+ {
+ log.trace("addBinding(" + binding + ") being called");
+ }
if (binding.isExclusive())
{
exclusiveBindings.add(binding);
@@ -108,6 +119,12 @@
}
bindingsMap.put(binding.getID(), binding);
+
+ if (isTrace)
+ {
+ log.trace("Adding binding " + binding + " into " + this +
" bindingTable: " + debugBindings());
+ }
+
}
public void removeBinding(final Binding binding)
@@ -134,6 +151,11 @@
}
bindingsMap.remove(binding.getID());
+
+ if (isTrace)
+ {
+ log.trace("Removing binding " + binding + " into " + this +
" bindingTable: " + debugBindings());
+ }
}
public boolean redistribute(final ServerMessage message, final Queue originatingQueue,
final RoutingContext context) throws Exception
@@ -144,6 +166,11 @@
return false;
}
+ if (isTrace)
+ {
+ log.trace("Redistributing message " + message);
+ }
+
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -222,12 +249,12 @@
return false;
}
}
-
+
public PagingStore getPagingStore()
{
return pageStore;
}
-
+
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
boolean routed = false;
@@ -247,8 +274,8 @@
if (!routed)
{
- //TODO this is a little inefficient since we do the lookup once to see if the
property
- //is there, then do it again to remove the actual property
+ // TODO this is a little inefficient since we do the lookup once to see if the
property
+ // is there, then do it again to remove the actual property
if (message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS))
{
routeFromCluster(message, context);
@@ -259,6 +286,10 @@
}
else
{
+ if (isTrace)
+ {
+ log.trace("Routing message " + message + " on
binding=" + this);
+ }
for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
@@ -283,6 +314,15 @@
}
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "BindingsImpl [name=" + name + "]";
+ }
+
private Binding getNextBinding(final ServerMessage message,
final SimpleString routingName,
final List<Binding> bindings)
@@ -290,9 +330,9 @@
Integer ipos = routingNamePositions.get(routingName);
int pos = ipos != null ? ipos : 0;
-
+
int length = bindings.size();
-
+
int startPos = pos;
Binding theBinding = null;
@@ -470,7 +510,75 @@
}
}
}
+
+ private String debugBindings()
+ {
+ StringWriter writer = new StringWriter();
+ PrintWriter out = new PrintWriter(writer);
+
+ out.println("\n***************************************");
+
+ out.println("routingNameBindingMap:");
+ if (routingNameBindingMap.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
+ {
+ out.print("key=" + entry.getKey() + ", value=" +
entry.getValue());
+// for (Binding bind : entry.getValue())
+// {
+// out.print(bind + ",");
+// }
+ out.println();
+ }
+
+ out.println();
+
+ out.println("RoutingNamePositions:");
+ if (routingNamePositions.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<SimpleString, Integer> entry :
routingNamePositions.entrySet())
+ {
+ out.println("key=" + entry.getKey() + ", value=" +
entry.getValue());
+ }
+
+ out.println();
+
+ out.println("BindingsMap:");
+
+ if (bindingsMap.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet())
+ {
+ out.println("Key=" + entry.getKey() + ", value=" +
entry.getValue());
+ }
+
+ out.println();
+
+ out.println("ExclusiveBindings:");
+ if (exclusiveBindings.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+
+ for (Binding binding: exclusiveBindings)
+ {
+ out.println(binding);
+ }
+
+ out.println("#####################################################");
+
+
+ return writer.toString();
+ }
+
+
private void routeFromCluster(final ServerMessage message, final RoutingContext
context) throws Exception
{
byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -124,15 +124,30 @@
return BindingType.DIVERT;
}
+
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
- return "DivertBinding [divert=" + divert + "]";
+ return "DivertBinding [id=" + id +
+ ", address=" +
+ address +
+ ", divert=" +
+ divert +
+ ", filter=" +
+ filter +
+ ", uniqueName=" +
+ uniqueName +
+ ", routingName=" +
+ routingName +
+ ", exclusive=" +
+ exclusive +
+ "]";
}
-
+
public void close() throws Exception
{
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -77,6 +77,8 @@
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final SimpleString HDR_RESET_QUEUE_DATA = new
SimpleString("_HQ_RESET_QUEUE_DATA");
@@ -209,6 +211,10 @@
public void onNotification(final Notification notification)
{
+ if (isTrace)
+ {
+ log.trace("Receiving notification : " + notification);
+ }
synchronized (notificationLock)
{
NotificationType type = notification.getType();
@@ -1306,6 +1312,6 @@
public Bindings createBindings(final SimpleString address) throws Exception
{
- return new BindingsImpl(server.getGroupingHandler(),
pagingManager.getPageStore(address));
+ return new BindingsImpl(address, server.getGroupingHandler(),
pagingManager.getPageStore(address));
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -38,6 +38,8 @@
public class ChannelImpl implements Channel
{
private static final Logger log = Logger.getLogger(ChannelImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private volatile long id;
@@ -159,6 +161,11 @@
synchronized (sendLock)
{
packet.setChannelID(id);
+
+ if (isTrace)
+ {
+ log.trace("Sending packet nonblocking " + packet + " on
channeID=" + id);
+ }
HornetQBuffer buffer = packet.encode(connection);
@@ -193,7 +200,13 @@
{
lock.unlock();
}
+
+ if (isTrace)
+ {
+ log.trace("Writing buffer for channelID=" + id);
+ }
+
// The actual send must be outside the lock, or with OIO transport, the write
can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking
failover
connection.getTransportConnection().write(buffer, flush, batch);
@@ -350,6 +363,10 @@
{
if (resendCache != null)
{
+ if (isTrace)
+ {
+ log.trace("Replaying commands on channelID=" + id);
+ }
clearUpTo(otherLastConfirmedCommandID);
for (final Packet packet : resendCache)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -43,8 +43,14 @@
boolean isUseDuplicateDetection();
void activate();
-
+
+ void flushExecutor();
+
void setNotificationService(NotificationService notificationService);
RemotingConnection getForwardingConnection();
+
+ void pause() throws Exception;
+
+ void resume() throws Exception;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -33,7 +33,14 @@
Bridge getBridge();
void close() throws Exception;
+
+ public void resume() throws Exception;
+
+ boolean isClosed();
void reset() throws Exception;
+ void pause() throws Exception;
+
+ boolean isPaused();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -23,10 +23,10 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -195,8 +195,17 @@
while ((ref = refs.poll()) != null)
{
+ if (isTrace)
+ {
+ log.trace("Cancelling reference " + ref + " on bridge " +
this);
+ }
list.addFirst(ref);
}
+
+ if (isTrace && list.isEmpty())
+ {
+ log.trace("didn't have any references to cancel on bridge " +
this);
+ }
Queue queue = null;
@@ -210,27 +219,58 @@
}
}
+
+ public void flushExecutor()
+ {
+ // Wait for any create objects runnable to complete
+ Future future = new Future();
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ BridgeImpl.log.warn("Timed out waiting to stop");
+ }
+ }
+
+
public void stop() throws Exception
{
- if (started)
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being stopped");
+ }
+
+ stopping = true;
+
+ executor.execute(new StopRunnable());
+
+ if (notificationService != null)
{
- // We need to stop the csf here otherwise the stop runnable never runs since the
createobjectsrunnable is
- // trying to connect to the target
- // server which isn't up in an infinite loop
- if (csf != null)
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
NotificationType.BRIDGE_STOPPED, props);
+ try
{
- csf.close();
+ notificationService.sendNotification(notification);
}
+ catch (Exception e)
+ {
+ BridgeImpl.log.warn("unable to send notification when broadcast group is
stopped", e);
+ }
}
-
- log.info("Bridge " + this.name + " being stopped");
-
- stopping = true;
+ }
- executor.execute(new StopRunnable());
+ public void pause() throws Exception
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being paused");
+ }
- waitForRunnablesToComplete();
+ executor.execute(new PauseRunnable());
if (notificationService != null)
{
@@ -248,7 +288,13 @@
}
}
- public boolean isStarted()
+ public void resume() throws Exception
+ {
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
+ }
+
+ public boolean isStarted()
{
return started;
}
@@ -366,7 +412,7 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
@@ -424,6 +470,10 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
log.warn(name + "::Connection failed with failedOver=" + failedOver,
me);
+ if (isTrace)
+ {
+ log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me="
+ me + ", boolean failedOver=" + failedOver);
+ }
fail(false);
}
@@ -432,6 +482,8 @@
log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
+
+
// Package protected ---------------------------------------------
@@ -439,19 +491,29 @@
// Private -------------------------------------------------------
- private void waitForRunnablesToComplete()
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
{
- // Wait for any create objects runnable to complete
- Future future = new Future();
-
- executor.execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
- {
- BridgeImpl.log.warn("Timed out waiting to stop");
- }
+ return this.getClass().getName() +
+ " [name=" + name +
+ ", nodeUUID=" +
+ nodeUUID +
+ ", queue=" +
+ queue +
+ ", filter=" +
+ filter +
+ ", forwardingAddress=" +
+ forwardingAddress +
+ ", useDuplicateDetection=" +
+ useDuplicateDetection +
+ ", active=" +
+ active +
+ ", stopping=" +
+ stopping +
+ "]";
}
private void fail(final boolean beforeReconnect)
@@ -468,37 +530,41 @@
active = false;
}
- try
- {
+
if (!session.getConnection().isDestroyed())
{
if (beforeReconnect)
{
- synchronized (this)
+ try {
+ log.debug(name + "::Connection is destroyed, active = false
now");
+
+ cancelRefs();
+ }
+ catch (Exception e)
{
- log.debug(name + "::Connection is destroyed, active = false
now");
+ BridgeImpl.log.error("Failed to cancel refs", e);
}
-
- cancelRefs();
}
else
{
- afterConnect();
+ try
+ {
+ afterConnect();
- log.debug(name + "::After reconnect, setting active=true now");
- active = true;
+ log.debug(name + "::After reconnect, setting active=true
now");
+ active = true;
- if (queue != null)
+ if (queue != null)
+ {
+ queue.deliverAsync();
+ }
+ }
+ catch (Exception e)
{
- queue.deliverAsync();
+ BridgeImpl.log.error("Failed to call after connect", e);
}
}
}
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to cancel refs", e);
- }
}
/* Hook for doing extra stuff after connection */
@@ -530,9 +596,21 @@
try
{
- csf = createSessionFactory();
- // Session is pre-acknowledge
- session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
+ if (csf == null || csf.isClosed())
+ {
+ csf = createSessionFactory();
+ // Session is pre-acknowledge
+ session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
+ try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
+ }
if (forwardingAddress != null)
{
@@ -654,20 +732,25 @@
{
try
{
+ // We need to close the session outside of the lock,
+ // so any pending operation will be canceled right away
+
+ // TODO: Why closing the CSF will make a few clustering and failover tests to
+ // either deadlock or take forever on waiting
+ // locks
+ csf.close();
+ csf = null;
+ if (session != null)
+ {
+ log.debug("Cleaning up session " + session);
+ session.close();
+ session.removeFailureListener(BridgeImpl.this);
+ }
+
synchronized (BridgeImpl.this)
{
- if (!started)
- {
- return;
- }
-
log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
- if (session != null)
- {
- session.close();
- }
-
started = false;
active = false;
@@ -692,6 +775,40 @@
}
}
+ private class PauseRunnable implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ synchronized (BridgeImpl.this)
+ {
+ log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
+
+ started = false;
+
+ active = false;
+
+ }
+
+ queue.removeConsumer(BridgeImpl.this);
+
+ cancelRefs();
+
+ if (queue != null)
+ {
+ queue.deliverAsync();
+ }
+
+ log.info("paused bridge " + name);
+ }
+ catch (Exception e)
+ {
+ BridgeImpl.log.error("Failed to pause bridge", e);
+ }
+ }
+ }
+
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -50,6 +50,8 @@
public class ClusterConnectionBridge extends BridgeImpl
{
private static final Logger log = Logger.getLogger(ClusterConnectionBridge.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final MessageFlowRecord flowRecord;
@@ -195,7 +197,7 @@
flowRecord.getAddress() +
"%')");
- session.createQueue(managementNotificationAddress, notifQueueName, filter,
false);
+ session.createTemporaryQueue(managementNotificationAddress, notifQueueName,
filter);
notifConsumer = session.createConsumer(notifQueueName);
@@ -224,6 +226,12 @@
}
@Override
+ public void stop() throws Exception
+ {
+ super.stop();
+ }
+
+ @Override
protected ClientSessionFactory createSessionFactory() throws Exception
{
//We create the session factory using the specified connector
@@ -234,6 +242,11 @@
@Override
public void connectionFailed(HornetQException me, boolean failedOver)
{
+ if (isTrace)
+ {
+ log.trace("Connection Failed on ClusterConnectionBridge, failedOver = " +
failedOver + ", sessionClosed = " + session.isClosed(), new Exception
("trace"));
+ }
+
if (!failedOver && !session.isClosed())
{
try
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -65,6 +65,8 @@
public class ClusterConnectionImpl implements ClusterConnection
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final org.hornetq.utils.ExecutorFactory executorFactory;
@@ -78,7 +80,17 @@
private final SimpleString address;
+ private final long clientFailureCheckPeriod;
+
+ private final long connectionTTL;
+
private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final long maxRetryInterval;
+
+ private final int reconnectAttempts;
private final boolean useDuplicateDetection;
@@ -114,7 +126,12 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -144,7 +161,17 @@
this.address = address;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+ this.connectionTTL = connectionTTL;
+
this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
+ this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -188,7 +215,12 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
- final long retryInterval,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -218,7 +250,17 @@
this.address = address;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+ this.connectionTTL = connectionTTL;
+
this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
+ this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -355,13 +397,26 @@
{
serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
+ serverLocator.setReconnectAttempts(reconnectAttempts);
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
- serverLocator.setConfirmationWindowSize(0);
+ if (serverLocator.getConfirmationWindowSize() < 0)
+ {
+ // We can't have confirmationSize = -1 on the cluster Bridge
+ // Otherwise we won't have confirmation working
+ serverLocator.setConfirmationWindowSize(0);
+ }
+
+ if (!useDuplicateDetection)
+ {
+ log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
+ }
+ // if not using duplicate detection, we will send blocked
+ serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
+ serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
if(retryInterval > 0)
{
@@ -393,7 +448,7 @@
public synchronized void nodeDown(final String nodeID)
{
- log.debug("node " + nodeID + " being considered down on cluster
connection for nodeID=" + nodeUUID);
+ log.debug("node " + nodeID + " being considered down on cluster
connection for nodeID=" + nodeUUID, new Exception ("trace"));
if (nodeID.equals(nodeUUID.toString()))
{
return;
@@ -407,7 +462,11 @@
{
try
{
- record.reset();
+ if (isTrace)
+ {
+ log.trace("Closing clustering record " + record);
+ }
+ record.pause();
}
catch (Exception e)
{
@@ -485,11 +544,11 @@
}
else
{
- // FIXME apple and orange comparison. I don't understand it...
- //if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- // {
- // // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
- //}
+ log.info("Reattaching nodeID=" + nodeID);
+ if (record.isPaused())
+ {
+ record.resume();
+ }
}
}
catch (Exception e)
@@ -502,6 +561,11 @@
public void nodeAnnounced(final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair)
{
+ if (isTrace)
+ {
+ log.trace("nodeAnnouncedUp:" + nodeID);
+ }
+
if (nodeID.equals(nodeUUID.toString()))
{
return;
@@ -510,6 +574,10 @@
// if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
{
+ if (isTrace)
+ {
+ log.trace("Ignoring nodeUp message as it only allows direct
connections");
+ }
return;
}
@@ -517,20 +585,32 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
+ if (isTrace)
+ {
+ log.trace("Ignoring nodeUp as serverLocator==null");
+ }
return;
}
/*we dont create bridges to backups*/
if(connectorPair.a == null)
{
+ if (isTrace)
+ {
+ log.trace("Igoring nodeup as connectorPair.a==null (backup)");
+ }
return;
}
synchronized (records)
{
+ if (isTrace)
+ {
+ log.trace("Adding record for nodeID=" + nodeID);
+ }
try
{
MessageFlowRecord record = records.get(nodeID);
-
+
if (record == null)
{
// New node - create a new flow record
@@ -556,11 +636,10 @@
}
else
{
- // FIXME apple and orange comparison. I don't understand it...
- //if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- // {
- // // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
- //}
+ if (isTrace)
+ {
+ log.trace("It already had a node created before, ignoring the
nodeUp message");
+ }
}
}
catch (Exception e)
@@ -576,13 +655,32 @@
final Queue queue,
final boolean start) throws Exception
{
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(nodeID, connector,
queueName, queue);
- Bridge bridge = new ClusterConnectionBridge(serverLocator,
+ records.put(nodeID, record);
+
+ Bridge bridge = createBridge(record);
+
+ record.setBridge(bridge);
+
+ if (start)
+ {
+ bridge.start();
+ }
+ }
+
+ /**
+ * @param record
+ * @return
+ * @throws Exception
+ */
+ protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
+ {
+ ClusterConnectionBridge bridge = new ClusterConnectionBridge(serverLocator,
nodeUUID,
- nodeID,
- queueName,
- queue,
+ record.getNodeID(),
+ record.getQueueName(),
+ record.getQueue(),
executorFactory.getExecutor(),
null,
null,
@@ -596,16 +694,9 @@
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
record,
- connector);
+ record.getConnector());
- record.setBridge(bridge);
-
- records.put(nodeID, record);
-
- if (start)
- {
- bridge.start();
- }
+ return bridge;
}
// Inner classes
-----------------------------------------------------------------------------------
@@ -614,15 +705,28 @@
{
private Bridge bridge;
+ private final String nodeID;
+ private final TransportConfiguration connector;
+ private final SimpleString queueName;
private final Queue queue;
private final Map<SimpleString, RemoteQueueBinding> bindings = new
HashMap<SimpleString, RemoteQueueBinding>();
+
+ private volatile boolean isClosed = false;
+ private volatile boolean paused = false;
+
private volatile boolean firstReset = false;
- public MessageFlowRecordImpl(final Queue queue)
+ public MessageFlowRecordImpl(final String nodeID,
+ final TransportConfiguration connector,
+ final SimpleString queueName,
+ final Queue queue)
{
this.queue = queue;
+ this.nodeID = nodeID;
+ this.connector = connector;
+ this.queueName = queueName;
}
public String getAddress()
@@ -630,6 +734,38 @@
return address.toString();
}
+ /**
+ * @return the nodeID
+ */
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ /**
+ * @return the connector
+ */
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ /**
+ * @return the queueName
+ */
+ public SimpleString getQueueName()
+ {
+ return queueName;
+ }
+
+ /**
+ * @return the queue
+ */
+ public Queue getQueue()
+ {
+ return queue;
+ }
+
public int getMaxHops()
{
return maxHops;
@@ -637,17 +773,47 @@
public void close() throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Stopping bridge " + bridge);
+ }
+
+ isClosed = true;
+ clearBindings();
+
bridge.stop();
+ }
- clearBindings();
+ public void pause() throws Exception
+ {
+ paused = true;
+ clearBindings();
+ bridge.pause();
}
+ public boolean isPaused()
+ {
+ return paused;
+ }
+
+ public void resume() throws Exception
+ {
+ paused = false;
+ bridge.resume();
+ }
+
+ public boolean isClosed()
+ {
+ return isClosed;
+ }
+
public void reset() throws Exception
{
clearBindings();
}
- public void setBridge(final Bridge bridge)
+
+ public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
}
@@ -659,6 +825,10 @@
public synchronized void onMessage(final ClientMessage message)
{
+ if (isTrace)
+ {
+ log.trace("Receiving message " + message);
+ }
try
{
// Reset the bindings
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -17,7 +17,12 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -32,7 +37,10 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
@@ -682,8 +690,16 @@
serverLocator.setReconnectAttempts(config.getReconnectAttempts());
serverLocator.setRetryInterval(config.getRetryInterval());
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
+ serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
+ serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
+ serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
+ if (!config.isUseDuplicateDetection())
+ {
+ log.debug("Bridge " + config.getName() +
+ " is configured to not use duplicate detecion, it will send
messages synchronously");
+ }
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
nodeUUID,
@@ -710,14 +726,21 @@
}
}
- public synchronized void destroyBridge(final String name) throws Exception
+ public void destroyBridge(final String name) throws Exception
{
- Bridge bridge = bridges.remove(name);
- if (bridge != null)
+ Bridge bridge;
+
+ synchronized (this)
{
- bridge.stop();
- managementService.unregisterBridge(name);
+ bridge = bridges.remove(name);
+ if (bridge != null)
+ {
+ bridge.stop();
+ managementService.unregisterBridge(name);
+ }
}
+
+ bridge.flushExecutor();
}
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration
config) throws Exception
@@ -769,7 +792,12 @@
connector,
new
SimpleString(config.getName()),
new
SimpleString(config.getAddress()),
+
config.getClientFailureCheckPeriod(),
+ config.getConnectionTTL(),
config.getRetryInterval(),
+
config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
+ config.getReconnectAttempts(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
@@ -794,7 +822,12 @@
connector,
new
SimpleString(config.getName()),
new
SimpleString(config.getAddress()),
+
config.getClientFailureCheckPeriod(),
+ config.getConnectionTTL(),
config.getRetryInterval(),
+
config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
+ config.getReconnectAttempts(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -63,13 +63,11 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -159,6 +157,8 @@
private final MBeanServer mbeanServer;
private volatile boolean started;
+
+ private volatile boolean stopped;
private volatile SecurityStore securityStore;
@@ -353,6 +353,11 @@
nodeManager.startLiveNode();
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
log.info("Server is now live");
@@ -379,6 +384,8 @@
private class SharedStoreBackupActivation implements Activation
{
+
+ volatile boolean closed = false;
public void run()
{
try
@@ -397,6 +404,11 @@
configuration.setBackup(false);
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
clusterManager.activate();
@@ -480,6 +492,7 @@
backupActivationThread.interrupt();
+ // TODO: do we really need this?
Thread.sleep(1000);
}
@@ -536,6 +549,8 @@
public synchronized void start() throws Exception
{
+ stopped = false;
+
initialiseLogging();
checkJournalDirectory();
@@ -618,6 +633,7 @@
public void stop() throws Exception
{
+ stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
@@ -1463,7 +1479,13 @@
private void initialisePart2() throws Exception
{
// Load the journal and populate queues, transactions and caches in memory
-
+
+
+ if (stopped)
+ {
+ return;
+ }
+
pagingManager.reloadStores();
JournalLoadInformation[] journalInfo = loadJournals();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -853,6 +853,7 @@
public synchronized void cancel(final MessageReference reference, final long timeBase)
throws Exception
{
+ deliveringCount.decrementAndGet();
if (checkRedelivery(reference, timeBase))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
@@ -870,7 +871,7 @@
{
if (isTrace)
{
- log.trace("moving expired reference " + ref + " to address =
" + expiryAddress + " from queue=" + this.getName(), new Exception
("trace"));
+ log.trace("moving expired reference " + ref + " to address =
" + expiryAddress + " from queue=" + this.getName());
}
move(expiryAddress, ref, true, false);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -88,6 +88,8 @@
// Constants
-----------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Static
-------------------------------------------------------------------------------
@@ -598,6 +600,10 @@
public void commit() throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Calling commit");
+ }
try
{
tx.commit();
@@ -1075,7 +1081,7 @@
public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
-
+
SimpleString address = message.getAddress();
message.setMessageID(id);
@@ -1096,6 +1102,12 @@
}
}
+ if (isTrace)
+ {
+ log.trace("send(message=" + message + ", direct=" + direct +
") being called");
+ }
+
+
if (message.getAddress().equals(managementAddress))
{
// It's a management message
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -84,6 +84,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(ManagementServiceImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final MBeanServer mbeanServer;
@@ -635,6 +637,12 @@
public void sendNotification(final Notification notification) throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Sending Notification = " + notification +
+ ", notificationEnabled=" + notificationsEnabled +
+ " messagingServerControl=" + messagingServerControl, new
Exception ("trace"));
+ }
if (messagingServerControl != null && notificationsEnabled)
{
// This needs to be synchronized since we need to ensure notifications are
processed in strict sequence
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -21,10 +21,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -118,12 +123,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -260,12 +267,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -386,12 +395,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -527,12 +538,14 @@
forwardAddress,
null,
null,
+
clientFailureCheckPeriod,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
clientFailureCheckPeriod,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -661,12 +674,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -104,12 +104,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
true,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -176,6 +178,8 @@
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
@@ -267,12 +271,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
500,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
true,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -478,12 +484,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
false,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -629,12 +637,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
1,
true,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -706,6 +716,8 @@
BridgeStartTest.log.info("stopping bridge manually");
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = numMessages; i < numMessages * 2; i++)
{
@@ -739,6 +751,8 @@
Assert.assertNull(consumer1.receiveImmediate());
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -139,14 +139,16 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
// Choose
confirmation size to make sure acks
// are sent
numMessages *
messageSize / 2,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorConfig,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -339,12 +341,14 @@
forwardAddress,
filterString,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -510,12 +514,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
100,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -665,12 +671,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
100,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
true,
0,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -853,12 +861,14 @@
forwardAddress,
null,
SimpleTransformer.class.getName(),
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1001,12 +1011,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
0,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1242,12 +1254,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1385,14 +1399,16 @@
// address
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
// Choose
confirmation size to make sure acks
// are sent
numMessages *
messageSize / 2,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -116,12 +116,14 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
true,
1024,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -188,6 +190,7 @@
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
bridge.stop();
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -113,13 +113,15 @@
forwardAddress,
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- staticConnectors,
+
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -1812,6 +1812,7 @@
* we need to wait a lil while between server start up to allow the server to
communicate in some order.
* This is to avoid split brain on startup
* */
+ // TODO: Do we really need this?
Thread.sleep(500);
}
for (int node : nodes)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -119,7 +119,7 @@
send(0, "queues.testaddress", 10, false, null);
verifyNotReceive(0);
}
-
+
public void testStopAndStartTarget() throws Exception
{
startServers(0, 1);
@@ -150,9 +150,14 @@
stopServers(1);
- OnewayTwoNodeClusterTest.log.info("restarting server 1");
+ OnewayTwoNodeClusterTest.log.info("restarting server 1(" +
servers[1].getIdentity() + ")");
startServers(1);
+
+ //Thread.sleep(1000);
+
+ log.info("Server 1 id=" + servers[1].getNodeID());
+
long end = System.currentTimeMillis();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -140,6 +140,7 @@
staticConnectors, false);
backupConfig.getClusterConfigurations().add(cccLive);
backupServer = createBackupServer();
+ backupServer.getServer().setIdentity("bkpIdentityServer");
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -69,10 +69,11 @@
printBindings(2);
- sendInRange(1, "queues.testaddress", 0, 10, false, null);
+ sendInRange(1, "queues.testaddress", 0, 10, true, null);
System.out.println("stopping******************************************************");
stopServers(0);
+ Thread.sleep(2000);
System.out.println("stopped******************************************************");
startServers(0);
@@ -87,7 +88,7 @@
sendInRange(1, "queues.testaddress", 10, 20, false, null);
- verifyReceiveAllInRange(10, 20, 0);
+ verifyReceiveAllInRange(0, 20, 0);
System.out.println("*****************************************************************************");
}
finally
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -166,12 +166,14 @@
targetQueueConfig.getAddress(),
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
RandomUtil.randomDouble(),
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -144,12 +144,14 @@
targetQueueConfig.getAddress(),
null,
null,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
RandomUtil.randomDouble(),
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-06-15
14:15:02 UTC (rev 10810)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -87,7 +87,7 @@
{
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final BindingsImpl bind = new BindingsImpl(null, null);
+ final BindingsImpl bind = new BindingsImpl(null, null, null);
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-15
14:15:02 UTC (rev 10810)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-15
15:15:58 UTC (rev 10811)
@@ -971,7 +971,15 @@
}
// We shutdown the global pools to give a better isolation between tests
- ServerLocatorImpl.clearThreadPools();
+ try
+ {
+ ServerLocatorImpl.clearThreadPools();
+ }
+ catch (Throwable e)
+ {
+ log.info(threadDump(e.getMessage()));
+ throw new RuntimeException (e.getMessage(), e);
+ }
}
protected byte[] autoEncode(final Object... args)