Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 16:31:28 -0400 (Fri, 09 Sep 2011)
New Revision: 11314
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
Modified:
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixes on Clustering
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-09-09
18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-09-09
20:31:28 UTC (rev 11314)
@@ -2,8 +2,8 @@
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=8
-hornetq.version.incrementingVersion=121
+hornetq.version.incrementingVersion=122
hornetq.version.versionSuffix=CR1
hornetq.version.versionTag=CR1
hornetq.netty.version=(a)NETTY.VERSION@
-hornetq.version.compatibleVersionList=121
+hornetq.version.compatibleVersionList=121,122
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last);
+ void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
- void nodeDown(String nodeID);
+ void nodeDown(long eventUID, String nodeID);
}
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * To be called right after the ConnectionFactory created a connection.
+ * This listener is not part of the API and shouldn't be used by users.
+ * (if you do so we can't guarantee any API compatibility on this class)
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface AfterConnectInternalListener
+{
+ void onConnection(ClientSessionFactoryInternal sf);
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -43,12 +43,13 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
-import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -1291,21 +1292,16 @@
ClientSessionFactoryImpl.log.trace(this + "::Subscribing
Topology");
}
- channel0.send(new
SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
- if (serverLocator.isClusterConnection())
- {
- TransportConfiguration config =
serverLocator.getClusterTransportConfiguration();
- if (ClientSessionFactoryImpl.isDebug)
- {
- ClientSessionFactoryImpl.log.debug("Announcing node " +
serverLocator.getNodeID() +
- ", isBackup=" +
- serverLocator.isBackup());
- }
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
serverLocator.isBackup(), config));
- }
+ channel0.send(new
SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(),
VersionLoader.getVersion().getIncrementingVersion()));
+
}
}
+ if (serverLocator.getAfterConnectInternalListener() != null)
+ {
+ serverLocator.getAfterConnectInternalListener().onConnection(this);
+ }
+
if (ClientSessionFactoryImpl.log.isTraceEnabled())
{
ClientSessionFactoryImpl.log.trace("returning " + connection);
@@ -1314,6 +1310,20 @@
return connection;
}
+ /**
+ * @param channel0
+ */
+ public void sendNodeAnnounce(final long currentEventID, String nodeID, boolean
isBackup, TransportConfiguration config, TransportConfiguration backupConfig)
+ {
+ Channel channel0 = connection.getChannel(0, -1);
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Announcing node " +
serverLocator.getNodeID() +
+ ", isBackup=" + isBackup);
+ }
+ channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config,
backupConfig));
+ }
+
@Override
public void finalize() throws Throwable
{
@@ -1439,7 +1449,7 @@
if (nodeID != null)
{
- serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(),
msg.getNodeID().toString());
}
closeExecutor.execute(new Runnable()
@@ -1464,7 +1474,7 @@
{
ClientSessionFactoryImpl.log.debug("Notifying " +
topMessage.getNodeID() + " going down");
}
- serverLocator.notifyNodeDown(topMessage.getNodeID());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(),
topMessage.getNodeID());
}
else
{
@@ -1478,9 +1488,36 @@
" csf created
at\nserverLocator=" +
serverLocator, e);
}
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
+ serverLocator.notifyNodeUp(System.currentTimeMillis(),
topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+ {
+ ClusterTopologyChangeMessage_V2 topMessage =
(ClusterTopologyChangeMessage_V2)packet;
+
+ if (topMessage.isExit())
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Notifying " +
topMessage.getNodeID() + " going down");
+ }
+ serverLocator.notifyNodeDown(topMessage.getUniqueEventID(),
topMessage.getNodeID());
+ }
+ else
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Node " +
topMessage.getNodeID() +
+ " going up, connector = "
+
+ topMessage.getPair() +
+ ", isLast=" +
+ topMessage.isLast() +
+ " csf created
at\nserverLocator=" +
+ serverLocator, e);
+ }
+ serverLocator.notifyNodeUp(topMessage.getUniqueEventID(),
topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ }
+ }
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -42,6 +42,8 @@
void removeSession(final ClientSessionInternal session, boolean failingOver);
void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws
HornetQException;
+
+ void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup,
TransportConfiguration config, TransportConfiguration backupConfig);
TransportConfiguration getConnectorConfiguration();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -165,6 +165,8 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -483,7 +485,9 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final
DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
{
this(topology, useHA, groupConfiguration, null);
@@ -545,6 +549,11 @@
}
});
}
+
+ public Executor getExecutor()
+ {
+ return startExecutor;
+ }
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
@@ -554,7 +563,7 @@
finalizeCheck = false;
}
- public ClientSessionFactory connect() throws Exception
+ public ClientSessionFactoryInternal connect() throws Exception
{
ClientSessionFactoryInternal sf;
// static list of initial connectors
@@ -571,6 +580,19 @@
return sf;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
public boolean isClosed()
{
return closed || closing;
@@ -1244,7 +1266,10 @@
closed = true;
}
- public void notifyNodeDown(final String nodeID)
+ /** This is directly called when the connection to the node is gone,
+ * or when the node sends a disconnection.
+ * Look for callers of this method! */
+ public void notifyNodeDown(final long eventTime, final String nodeID)
{
if (topology == null)
@@ -1258,27 +1283,31 @@
log.debug("nodeDown " + this + " nodeID=" + nodeID + "
as being down", new Exception("trace"));
}
- topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
+ if (topology.removeMember(eventTime, nodeID))
{
- updateArraysAndPairs();
+ if (topology.isEmpty())
+ {
+ // Resetting the topology to its original condition as it was brand new
+ topologyArray = null;
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
receivedTopology = false;
}
- }
- else
- {
- topologyArray = null;
+ else
+ {
+ updateArraysAndPairs();
- receivedTopology = false;
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) !=
null)
+ {
+ // Resetting the topology to its original condition as it was brand new
+ receivedTopology = false;
+ }
+ }
}
}
- public void notifyNodeUp(final String nodeID,
+ public void notifyNodeUp(long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1293,33 +1322,33 @@
log.debug("NodeUp " + this + "::nodeID=" + nodeID + ",
connectorPair=" + connectorPair, new Exception("trace"));
}
- topology.addMember(nodeID, new TopologyMember(connectorPair), last);
+ TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
- TopologyMember actMember = topology.getMember(nodeID);
+ if (topology.updateMember(uniqueEventID, nodeID, member))
+ {
- if (actMember != null && actMember.getConnector().a != null &&
actMember.getConnector().b != null)
- {
- for (ClientSessionFactory factory : factories)
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember != null && actMember.getConnector().a != null &&
actMember.getConnector().b != null)
{
-
((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
-
actMember.getConnector().b);
+ for (ClientSessionFactory factory : factories)
+ {
+
((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+
actMember.getConnector().b);
+ }
}
- }
- if (connectorPair.a != null)
- {
updateArraysAndPairs();
}
- synchronized (this)
+ if (last)
{
- if (last)
+ synchronized (this)
{
receivedTopology = true;
+ // Notify if waiting on getting topology
+ notifyAll();
}
-
- // Notify if waiting on getting topology
- notifyAll();
}
}
@@ -1569,9 +1598,9 @@
threadPool,
scheduledThreadPool,
interceptors);
-
+
factory.disableFinalizeCheck();
-
+
connectors.add(new Connector(initialConnector, factory));
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -31,8 +31,14 @@
{
void start(Executor executor) throws Exception;
+ Executor getExecutor();
+
void factoryClosed(final ClientSessionFactory factory);
+ AfterConnectInternalListener getAfterConnectInternalListener();
+
+ void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+
/** Used to better identify Cluster Connection Locators on logs while debugging logs
*/
void setIdentity(String identity);
@@ -42,11 +48,16 @@
void cleanup();
- ClientSessionFactory connect() throws Exception;
+ ClientSessionFactoryInternal connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
+ void notifyNodeUp(long uniqueEventID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
- void notifyNodeDown(String nodeID);
+ /**
+ *
+ * @param uniqueEventID 0 means get the previous ID +1
+ * @param nodeID
+ */
+ void notifyNodeDown(long uniqueEventID, String nodeID);
void setClusterConnection(boolean clusterConnection);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-09
18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -36,16 +36,12 @@
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
-
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
- private transient HashMap<String, Pair<Long, Integer>> mapBackof = new
HashMap<String, Pair<Long, Integer>>();
-
private Executor executor = null;
/** Used to debug operations.
@@ -65,6 +61,8 @@
*/
private final Map<String, TopologyMember> mapTopology = new
ConcurrentHashMap<String, TopologyMember>();
+ private final Map<String, Long> mapDelete = new ConcurrentHashMap<String,
Long>();
+
public Topology(final Object owner)
{
this.owner = owner;
@@ -104,93 +102,142 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember memberInput, final
boolean last)
+ /** This is called by the server when the node is activated from backup state. It will
always succeed */
+ public void updateAsLive(final String nodeId, final TopologyMember memberInput)
{
synchronized (this)
{
- TopologyMember currentMember = mapTopology.get(nodeId);
+ if (log.isDebugEnabled())
+ {
+ log.info(this + "::Live node " + nodeId + "=" +
memberInput);
+ }
+ memberInput.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, memberInput);
+ sendMemberUp(memberInput.getUniqueEventID(), nodeId, memberInput);
+ }
+ }
+ /** This is called by the server when the node is activated from backup state. It will
always succeed */
+ public TopologyMember updateBackup(final String nodeId, final TopologyMember
memberInput)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::updateBackup::" + nodeId + ",
memberInput=" + memberInput);
+ }
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = getMember(nodeId);
if (currentMember == null)
{
- if (!testBackof(nodeId))
- {
- return false;
- }
+ log.warn("There's no live to be updated on backup update,
node=" + nodeId + " memberInput=" + memberInput,
+ new Exception("trace"));
+ currentMember = memberInput;
+ mapTopology.put(nodeId, currentMember);
+ }
+
+ TopologyMember newMember = new TopologyMember(currentMember.getA(),
memberInput.getB());
+ newMember.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(newMember.getUniqueEventID(), nodeId, newMember);
+
+ return newMember;
+ }
+ }
+
+ /**
+ *
+ * @param <p>uniqueIdentifier an unique identifier for when the change was made
+ * We will use current time millis for starts, and a ++ of that number for
shutdown. </p>
+ * @param nodeId
+ * @param memberInput
+ * @return
+ */
+ public boolean updateMember(final long uniqueEventID, final String nodeId, final
TopologyMember memberInput)
+ {
+
+ Long deleteTme = mapDelete.get(nodeId);
+ if (deleteTme != null && uniqueEventID < deleteTme)
+ {
+ log.debug("Update uniqueEvent=" + uniqueEventID +
+ ", nodeId=" +
+ nodeId +
+ ", memberInput=" +
+ memberInput +
+ " being rejected as there was a delete done after that");
+ return false;
+ }
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = mapTopology.get(nodeId);
+
+ if (currentMember == null)
+ {
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this + "::NewMemeberAdd " + this +
- " MEMBER WAS NULL, Add member nodeId=" +
+ Topology.log.debug(this + "::NewMemeberAdd nodeId=" +
nodeId +
" member = " +
- memberInput +
- " size = " +
- mapTopology.size(), new Exception("trace"));
+ memberInput, new Exception("trace"));
}
+ memberInput.setUniqueEventID(uniqueEventID);
mapTopology.put(nodeId, memberInput);
- sendMemberUp(nodeId, memberInput);
+ sendMemberUp(uniqueEventID, nodeId, memberInput);
return true;
}
else
{
- if (log.isTraceEnabled())
+ if (uniqueEventID > currentMember.getUniqueEventID())
{
- log.trace(this + ":: validating update for currentMember=" +
currentMember + " of memberInput=" + memberInput);
- }
+ TopologyMember newMember = new TopologyMember(memberInput.getA(),
memberInput.getB());
- boolean replaced = false;
- TopologyMember memberToSend = currentMember;
+ if (newMember.getA() == null && currentMember.getA() != null)
+ {
+ newMember.setA(currentMember.getA());
+ }
- if (hasChanged("a", memberToSend.getConnector().a,
memberInput.getConnector().a))
- {
- if (!replaced && !testBackof(nodeId))
+ if (newMember.getB() == null && currentMember.getB() != null)
{
- return false;
+ newMember.setB(currentMember.getB());
}
- memberToSend = new TopologyMember(memberInput.getConnector().a,
memberToSend.getConnector().b);
- replaced = true;
- }
- if (hasChanged("b", memberToSend.getConnector().b,
memberInput.getConnector().b))
- {
- if (!replaced && !testBackof(nodeId))
+ if (log.isDebugEnabled())
{
- return false;
+ log.debug(this + "::updated currentMember=nodeID=" +
+ nodeId +
+ ", currentMember=" +
+ currentMember +
+ ", memberInput=" +
+ memberInput +
+ "newMember=" + newMember);
}
- memberToSend = new TopologyMember(memberToSend.getConnector().a,
memberInput.getConnector().b);
- replaced = true;
- }
- if (replaced)
- {
+
+ newMember.setUniqueEventID(uniqueEventID);
mapTopology.remove(nodeId);
- mapTopology.put(nodeId, memberToSend);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(uniqueEventID, nodeId, newMember);
- sendMemberUp(nodeId, memberToSend);
return true;
}
-
+ else
+ {
+ return false;
+ }
}
}
-
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug(Topology.this + " Add member nodeId=" +
- nodeId +
- " member = " +
- memberInput +
- " has been ignored since there was no change", new
Exception("trace"));
- }
-
- return false;
}
/**
* @param nodeId
* @param memberToSend
*/
- private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+ private void sendMemberUp(final long uniqueEventID, final String nodeId, final
TopologyMember memberToSend)
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -207,12 +254,17 @@
{
if (Topology.log.isTraceEnabled())
{
- Topology.log.trace(Topology.this + " informing " + listener +
" about node up = " + nodeId);
+ Topology.log.trace(Topology.this + " informing " +
+ listener +
+ " about node up = " +
+ nodeId +
+ " connector = " +
+ memberToSend.getConnector());
}
try
{
- listener.nodeUP(nodeId, memberToSend.getConnector(), false);
+ listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(),
false);
}
catch (Throwable e)
{
@@ -224,46 +276,6 @@
}
/**
- * @param nodeId
- * @param backOfData
- */
- private boolean testBackof(final String nodeId)
- {
- Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
-
- if (backOfData != null)
- {
- backOfData.b += 1;
-
- long timeDiff = System.currentTimeMillis() - backOfData.a;
-
- // To prevent a loop where nodes are being considered down and up
- if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
- {
-
- // The cluster may get in loop without this..
- // Case one node is stll sending nodeDown while another member is sending
nodeUp
- log.warn(backOfData.b + ", The topology controller identified a blast
events and it's interrupting the flow of the loop, nodeID=" +
- nodeId +
- ", topologyInstance=" +
- this,
- new Exception("this exception is just to trace
location"));
- return false;
- }
- else if (timeDiff < BACKOF_TIMEOUT)
- {
- log.warn(this + "::Simple blast of " + nodeId, new
Exception("this exception is just to trace location"));
- }
- else if (timeDiff >= BACKOF_TIMEOUT)
- {
- mapBackof.remove(nodeId);
- }
- }
-
- return true;
- }
-
- /**
* @return
*/
private ArrayList<ClusterTopologyListener> copyListeners()
@@ -276,28 +288,26 @@
return listenersCopy;
}
- public boolean removeMember(final String nodeId)
+ public boolean removeMember(final long uniqueEventID, final String nodeId)
{
TopologyMember member;
synchronized (this)
{
- Pair<Long, Integer> value = mapBackof.get(nodeId);
-
- if (value == null)
+ member = mapTopology.get(nodeId);
+ if (member != null)
{
- value = new Pair<Long, Integer>(0l, 0);
- mapBackof.put(nodeId, value);
+ if (member.getUniqueEventID() > uniqueEventID)
+ {
+ log.info("The removeMember was issued before the node " + nodeId
+ " was started, ignoring call");
+ member = null;
+ }
+ else
+ {
+ mapDelete.put(nodeId, uniqueEventID);
+ member = mapTopology.remove(nodeId);
+ }
}
-
- value.a = System.currentTimeMillis();
-
- if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
- {
- value.b = 0;
- }
-
- member = mapTopology.remove(nodeId);
}
if (Topology.log.isDebugEnabled())
@@ -327,7 +337,7 @@
}
try
{
- listener.nodeDown(nodeId);
+ listener.nodeDown(uniqueEventID, nodeId);
}
catch (Exception e)
{
@@ -354,14 +364,13 @@
}
/**
- * it will send all the member updates to listeners, independently of being changed or
not
+ * it will send the member to its listeners
* @param nodeID
* @param member
*/
- public void sendMemberToListeners(final String nodeID, final TopologyMember member)
+ public void sendMember(final String nodeID)
{
- // To make sure it was updated
- addMember(nodeID, member, false);
+ final TopologyMember member = getMember(nodeID);
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -380,7 +389,7 @@
" with connector=" +
member.getConnector());
}
- listener.nodeUP(nodeID, member.getConnector(), false);
+ listener.nodeUP(member.getUniqueEventID(), nodeID, member.getConnector(),
false);
}
}
});
@@ -417,18 +426,21 @@
" to " +
listener);
}
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count
== copy.size());
+ listener.nodeUP(entry.getValue().getUniqueEventID(),
+ entry.getKey(),
+ entry.getValue().getConnector(),
+ ++count == copy.size());
}
}
});
}
- public TopologyMember getMember(final String nodeID)
+ public synchronized TopologyMember getMember(final String nodeID)
{
return mapTopology.get(nodeID);
}
- public boolean isEmpty()
+ public synchronized boolean isEmpty()
{
return mapTopology.isEmpty();
}
@@ -448,11 +460,11 @@
int count = 0;
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null)
+ if (member.getA() != null)
{
count++;
}
- if (member.getConnector().b != null)
+ if (member.getB() != null)
{
count++;
}
@@ -499,30 +511,13 @@
this.owner = owner;
}
- private boolean hasChanged(final String debugInfo, final TransportConfiguration a,
final TransportConfiguration b)
- {
- boolean changed = a == null && b != null || a != null && b != null
&& !a.equals(b);
-
- if (log.isTraceEnabled())
- {
-
- log.trace(this + "::Validating current=" + a
- + " != input=" + b +
- (changed ? " and it has changed" : " and it didn't
change") +
- ", for validation of " +
- debugInfo);
- }
-
- return changed;
- }
-
public TransportConfiguration getBackupForConnector(final TransportConfiguration
connectorConfiguration)
{
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null &&
member.getConnector().a.equals(connectorConfiguration))
+ if (member.getA() != null &&
member.getA().equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getB();
}
}
return null;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -27,21 +27,61 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration>
connector)
+ /** transient to avoid serialization changes */
+ private transient long uniqueEventID = System.currentTimeMillis();
+
+ public TopologyMember(final Pair<TransportConfiguration, TransportConfiguration>
connector)
{
this.connector = connector;
+ uniqueEventID = System.currentTimeMillis();
}
- public TopologyMember(TransportConfiguration a, TransportConfiguration b)
+ public TopologyMember(final TransportConfiguration a, final TransportConfiguration b)
{
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
+ public TransportConfiguration getA()
+ {
+ return connector.a;
+ }
+
+ public TransportConfiguration getB()
+ {
+ return connector.b;
+ }
+
+ public void setB(final TransportConfiguration param)
+ {
+ connector.b = param;
+ }
+
+ public void setA(final TransportConfiguration param)
+ {
+ connector.a = param;
+ }
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ /**
+ * @param uniqueEventID the uniqueEventID to set
+ */
+ public void setUniqueEventID(final long uniqueEventID)
+ {
+ this.uniqueEventID = uniqueEventID;
+ }
+
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
}
-
+
@Override
public String toString()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -242,6 +242,7 @@
try
{
clusterConnection.start();
+ clusterConnection.flushExecutor();
}
finally
{
@@ -255,6 +256,7 @@
try
{
clusterConnection.stop();
+ clusterConnection.flushExecutor();
}
finally
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -373,8 +373,12 @@
if (complete)
{
- log.info("Address " + pagingStore.getAddress() +
+ if (log.isDebugEnabled())
+ {
+ log.debug("Address " + pagingStore.getAddress() +
" is leaving page mode as all messages are consumed and
acknowledged from the page store");
+ }
+
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -36,6 +36,9 @@
* @return the id
*/
long getID();
+
+ /** For protocol check */
+ boolean supports(byte packetID);
/**
* sends a packet on this channel.
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -25,6 +25,15 @@
*/
public interface CoreRemotingConnection extends RemotingConnection
{
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ int getClientVersion();
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ void setClientVersion(int clientVersion);
+
/**
* return the channel with the channel id specified.
* <p/>
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -94,6 +94,19 @@
resendCache = null;
}
}
+
+ public boolean supports(final byte packetType)
+ {
+ int version = connection.getClientVersion();
+
+ switch (packetType)
+ {
+ case PacketImpl.CLUSTER_TOPOLOGY_V2:
+ return version >= 122;
+ default:
+ return true;
+ }
+ }
public long getID()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -32,9 +32,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -112,13 +114,19 @@
// Just send a ping back
channel0.send(packet);
}
- else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
+ else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY ||
packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
{
SubscribeClusterTopologyUpdatesMessage msg =
(SubscribeClusterTopologyUpdatesMessage)packet;
+ if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
+ {
+
channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2)msg).getClientVersion());
+ }
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
@@ -129,12 +137,19 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new
ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID, connectorPair, last));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ }
}
});
}
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long uniqueEventID, final String nodeID)
{
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
@@ -143,7 +158,14 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new
ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
}
});
}
@@ -177,13 +199,13 @@
}
else
{
- pair = new Pair<TransportConfiguration,
TransportConfiguration>(msg.getConnector(), null);
+ pair = new Pair<TransportConfiguration,
TransportConfiguration>(msg.getConnector(), msg.getBackupConnector());
}
if (isTrace)
{
log.trace("Server " + server + " receiving nodeUp from
NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false,
true);
+ server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(),
msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -163,6 +163,17 @@
"Server will not accept create session
requests");
}
+ if (connection.getClientVersion() == 0)
+ {
+ connection.setClientVersion(request.getVersion());
+ }
+ else if (connection.getClientVersion() != request.getVersion())
+ {
+ log.warn("Client is not being consistent on the request versioning.
" +
+ "It just sent a version id=" + request.getVersion() +
+ " while it informed " + connection.getClientVersion() + "
previously");
+ }
+
Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
ServerSession session = server.createSession(request.getName(),
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
@@ -83,11 +84,13 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -152,6 +155,7 @@
import
org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
/**
* A PacketDecoder
@@ -504,6 +508,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case CLUSTER_TOPOLOGY_V2:
+ {
+ packet = new ClusterTopologyChangeMessage_V2();
+ break;
+ }
case NODE_ANNOUNCE:
{
packet = new NodeAnnounceMessage();
@@ -514,6 +523,11 @@
packet = new SubscribeClusterTopologyUpdatesMessage();
break;
}
+ case SUBSCRIBE_TOPOLOGY_V2:
+ {
+ packet = new SubscribeClusterTopologyUpdatesMessageV2();
+ break;
+ }
case SESS_ADD_METADATA:
{
packet = new SessionAddMetaDataMessage();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -193,7 +193,13 @@
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
+ // For newer versions
+ public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
+
+ public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -72,6 +72,8 @@
private volatile boolean destroyed;
private final boolean client;
+
+ private int clientVersion;
// Channels 0-9 are reserved for the system
// 0 is for pinging
@@ -183,7 +185,23 @@
failureListeners.addAll(listeners);
}
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+ /**
+ * @param clientVersion the clientVersion to set
+ */
+ public void setClientVersion(int clientVersion)
+ {
+ this.clientVersion = clientVersion;
+ }
+
public Object getID()
{
return transportConnection.getID();
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Clebert Suconic
+ *
+ */
+public class ClusterTopologyChangeMessage_V2 extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean exit;
+
+ private String nodeID;
+
+ private Pair<TransportConfiguration, TransportConfiguration> pair;
+
+ private long uniqueEventID;
+
+ private boolean last;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean
last)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.nodeID = nodeID;
+
+ this.pair = pair;
+
+ this.last = last;
+
+ this.exit = false;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.exit = true;
+
+ this.nodeID = nodeID;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2()
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getPair()
+ {
+ return pair;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ public boolean isExit()
+ {
+ return exit;
+ }
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeBoolean(exit);
+ buffer.writeString(nodeID);
+ buffer.writeLong(uniqueEventID);
+ if (!exit)
+ {
+ if (pair.a != null)
+ {
+ buffer.writeBoolean(true);
+ pair.a.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (pair.b != null)
+ {
+ buffer.writeBoolean(true);
+ pair.b.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(last);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ exit = buffer.readBoolean();
+ nodeID = buffer.readString();
+ uniqueEventID = buffer.readLong();
+ if (!exit)
+ {
+ boolean hasLive = buffer.readBoolean();
+ TransportConfiguration a;
+ if(hasLive)
+ {
+ a = new TransportConfiguration();
+ a.decode(buffer);
+ }
+ else
+ {
+ a = null;
+ }
+ boolean hasBackup = buffer.readBoolean();
+ TransportConfiguration b;
+ if (hasBackup)
+ {
+ b = new TransportConfiguration();
+ b.decode(buffer);
+ }
+ else
+ {
+ b = null;
+ }
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+ last = buffer.readBoolean();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -34,21 +34,29 @@
private boolean backup;
+ private long currentEventID;
+
private TransportConfiguration connector;
+ private TransportConfiguration backupConnector;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final
TransportConfiguration tc)
+ public NodeAnnounceMessage(final long currentEventID, final String nodeID, final
boolean backup, final TransportConfiguration tc, final TransportConfiguration
backupConnector)
{
super(PacketImpl.NODE_ANNOUNCE);
+ this.currentEventID = currentEventID;
+
this.nodeID = nodeID;
this.backup = backup;
this.connector = tc;
+
+ this.backupConnector = backupConnector;
}
public NodeAnnounceMessage()
@@ -74,13 +82,43 @@
return connector;
}
+ public TransportConfiguration getBackupConnector()
+ {
+ return backupConnector;
+ }
+ /**
+ * @return the currentEventID
+ */
+ public long getCurrentEventID()
+ {
+ return currentEventID;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
buffer.writeBoolean(backup);
- connector.encode(buffer);
+ buffer.writeLong(currentEventID);
+ if (connector != null)
+ {
+ buffer.writeBoolean(true);
+ connector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (backupConnector != null)
+ {
+ buffer.writeBoolean(true);
+ backupConnector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
}
@Override
@@ -88,8 +126,17 @@
{
this.nodeID = buffer.readString();
this.backup = buffer.readBoolean();
- connector = new TransportConfiguration();
- connector.decode(buffer);
+ this.currentEventID = buffer.readLong();
+ if (buffer.readBoolean())
+ {
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+ if (buffer.readBoolean())
+ {
+ backupConnector = new TransportConfiguration();
+ backupConnector.decode(buffer);
+ }
}
/* (non-Javadoc)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -42,11 +42,23 @@
this.clusterConnection = clusterConnection;
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean
clusterConnection)
+ {
+ super(packetType);
+
+ this.clusterConnection = clusterConnection;
+ }
+
public SubscribeClusterTopologyUpdatesMessage()
{
super(PacketImpl.SUBSCRIBE_TOPOLOGY);
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
+ {
+ super(packetType);
+ }
+
// Public --------------------------------------------------------
public boolean isClusterConnection()
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessageV2 extends
SubscribeClusterTopologyUpdatesMessage
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int clientVersion;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int
clientVersion)
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
+
+ this.clientVersion = clientVersion;
+ }
+
+ public SubscribeClusterTopologyUpdatesMessageV2()
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ buffer.writeInt(clientVersion);
+ }
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ clientVersion = buffer.readInt();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-09
18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -15,12 +15,9 @@
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.persistence.OperationContext;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -46,6 +46,8 @@
void activate() throws Exception;
TransportConfiguration getConnector();
+
+ void flushExecutor();
// for debug
String describe();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -48,10 +48,8 @@
void activate();
- void notifyNodeDown(String nodeID);
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
-
Topology getTopology();
void flushExecutor();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -101,7 +101,7 @@
private final Transformer transformer;
- private volatile ClientSessionFactory csf;
+ private volatile ClientSessionFactoryInternal csf;
protected volatile ClientSessionInternal session;
@@ -200,6 +200,7 @@
{
this.notificationService = notificationService;
}
+
public synchronized void start() throws Exception
{
if (started)
@@ -652,6 +653,15 @@
}
}
+ /**
+ * @return
+ */
+ protected ClientSessionFactoryInternal getCurrentFactory()
+ {
+ return csf;
+ }
+
+
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -71,6 +71,8 @@
private final SimpleString idsHeaderName;
private final String targetNodeID;
+
+ private final long targetNodeEventUID;
private final TransportConfiguration connector;
@@ -85,6 +87,7 @@
final double retryMultiplier,
final long maxRetryInterval,
final UUID nodeUUID,
+ final long targetNodeEventUID,
final String targetNodeID,
final SimpleString name,
final Queue queue,
@@ -130,6 +133,7 @@
this.clusterManager = clusterManager;
+ this.targetNodeEventUID = targetNodeEventUID;
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
@@ -319,7 +323,7 @@
if (permanently)
{
log.debug("cluster node for bridge " + this.getName() + " is
permanently down");
- discoveryLocator.notifyNodeDown(targetNodeID);
+ discoveryLocator.notifyNodeDown(targetNodeEventUID+1, targetNodeID);
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -33,9 +33,10 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.AfterConnectInternalListener;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
@@ -56,6 +57,7 @@
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -69,7 +71,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection
+public class ClusterConnectionImpl implements ClusterConnection,
AfterConnectInternalListener
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -332,20 +334,34 @@
this.clusterManagerTopology = clusterManagerTopology;
}
- public synchronized void start() throws Exception
+ public void start() throws Exception
{
- if (started)
+ synchronized (this)
{
- return;
+ if (started)
+ {
+ return;
+ }
+
+
+ started = true;
+
+ if (!backup)
+ {
+ activate();
+ }
}
- started = true;
-
- if (!backup)
+ }
+
+ public void flushExecutor()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ if (!future.await(10000))
{
- activate();
+ server.threadDump("Couldn't finish executor on " + this);
}
-
}
public void stop() throws Exception
@@ -411,6 +427,25 @@
started = false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
+ */
+ public void onConnection(ClientSessionFactoryInternal sf)
+ {
+ TopologyMember localMember = manager.getLocalMember();
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().a,
+ localMember.getConnector().b);
+
+ // sf.sendNodeAnnounce(System.currentTimeMillis(),
+ // manager.getNodeId(),
+ // false,
+ // localMember.getConnector().a,
+ // localMember.getConnector().b);
+ }
+
public boolean isStarted()
{
return started;
@@ -471,6 +506,14 @@
log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
}
+ final TopologyMember currentMember =
clusterManagerTopology.getMember(nodeUUID.toString());
+
+ if (currentMember == null)
+ {
+ // sanity check only
+ throw new IllegalStateException("InternalError! The ClusterConnection
doesn't know about its own node = " + this);
+ }
+
serverLocator.setNodeID(nodeUUID.toString());
serverLocator.setIdentity("(main-ClusterConnection::" +
server.toString() + ")");
serverLocator.setReconnectAttempts(0);
@@ -493,6 +536,8 @@
serverLocator.addClusterTopologyListener(this);
+ serverLocator.setAfterConnectionInternalListener(this);
+
serverLocator.start(server.getExecutorFactory().getExecutor());
}
@@ -515,7 +560,7 @@
// ClusterTopologyListener implementation
------------------------------------------------------------------
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long eventUID, final String nodeID)
{
if (log.isDebugEnabled())
{
@@ -544,12 +589,11 @@
{
log.error("Failed to close flow record", e);
}
-
- server.getClusterManager().notifyNodeDown(nodeID);
}
}
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long eventUID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
final boolean last)
{
@@ -562,20 +606,18 @@
if (nodeID.equals(nodeUUID.toString()))
{
- if (connectorPair.b != null)
+ if (log.isTraceEnabled())
{
- if (log.isTraceEnabled())
- {
- log.trace(this + "::informing about backup to itself, nodeUUID=" +
nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
- }
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+ log.trace(this + "::informing about backup to itself, nodeUUID=" +
+ nodeUUID +
+ ", connectorPair=" +
+ connectorPair +
+ " this = " +
+ this);
}
return;
}
- // we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-
// if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
{
@@ -615,6 +657,7 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
}
+ log.info(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -635,7 +678,7 @@
queue = server.createQueue(queueName, queueName, null, true, false);
}
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+ createNewRecord(eventUID, nodeID, connectorPair.a, queueName, queue,
true);
}
else
{
@@ -656,7 +699,8 @@
}
}
- private void createNewRecord(final String targetNodeID,
+ private void createNewRecord(final long eventUID,
+ final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
@@ -679,6 +723,8 @@
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ targetLocator.setAfterConnectionInternalListener(this);
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
@@ -689,9 +735,14 @@
}
targetLocator.disableFinalizeCheck();
-
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
+ eventUID,
+ targetNodeID,
+ connector,
+ queueName,
+ queue);
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
targetLocator,
@@ -701,6 +752,7 @@
retryIntervalMultiplier,
maxRetryInterval,
nodeUUID,
+ record.getEventUID(),
record.getTargetNodeID(),
record.getQueueName(),
record.getQueue(),
@@ -742,6 +794,8 @@
{
private BridgeImpl bridge;
+ private final long eventUID;
+
private final String targetNodeID;
private final TransportConfiguration connector;
@@ -761,6 +815,7 @@
private volatile boolean firstReset = false;
public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+ final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
@@ -771,6 +826,7 @@
this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
+ this.eventUID = eventUID;
}
/* (non-Javadoc)
@@ -804,6 +860,14 @@
}
/**
+ * @return the eventUID
+ */
+ public long getEventUID()
+ {
+ return eventUID;
+ }
+
+ /**
* @return the nodeID
*/
public String getTargetNodeID()
@@ -1091,7 +1155,8 @@
// hops is too high
// or there are multiple cluster connections for the same address
- ClusterConnectionImpl.log.warn("Remote queue binding " +
clusterName +
+ ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
+ clusterName +
" has already been bound in the post
office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops
being too large or you have multiple cluster connections to the same nodes using
overlapping addresses");
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -168,6 +168,16 @@
{
return "ClusterManagerImpl[server=" + server + "]@" +
System.identityHashCode(this);
}
+
+ public TopologyMember getLocalMember()
+ {
+ return topology.getMember(nodeUUID.toString());
+ }
+
+ public String getNodeId()
+ {
+ return nodeUUID.toString();
+ }
public synchronized void start() throws Exception
{
@@ -183,9 +193,34 @@
deployBroadcastGroup(config);
}
+ String connectorName = null;
+
for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
{
- deployClusterConnection(config);
+ if (connectorName == null)
+ {
+ connectorName = config.getConnectorName();
+ break;
+ }
+ }
+
+ if (connectorName != null)
+ {
+ TransportConfiguration nodeConnector =
configuration.getConnectorConfigurations().get(connectorName);
+ if (nodeConnector == null)
+ {
+ log.warn("No connecor with name '" + connectorName +
+ "'. The cluster connection will not be
deployed.");
+ return;
+ }
+
+ // Now announce presence
+ announceNode(nodeConnector);
+
+ for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
+ {
+ deployClusterConnection(config);
+ }
}
}
@@ -195,13 +230,6 @@
deployBridge(config);
}
- // Now announce presence
-
- if (clusterConnections.size() > 0)
- {
- announceNode();
- }
-
started = true;
}
@@ -264,81 +292,26 @@
clusterConnections.clear();
}
- public void notifyNodeDown(String nodeID)
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean backup)
{
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
- log.debug(this + "::removing nodeID=" + nodeID, new
Exception("trace"));
-
- topology.removeMember(nodeID);
-
- }
-
- public void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean last,
- final boolean nodeAnnounce)
- {
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeUp " + nodeID + connectorPair + ",
nodeAnnounce=" + nodeAnnounce);
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
}
- TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member, last);
-
- if (!updated)
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
{
- if (log.isDebugEnabled())
- {
- log.debug(this + " ignored notifyNodeUp on nodeID=" +
- nodeID +
- " pair=" +
- connectorPair +
- " as the topology already knew about it");
- }
- return;
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a,
connectorPair.b));
}
-
- if (log.isDebugEnabled())
+ else
{
- log.debug(this + " received notifyNodeUp nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- ", nodeAnnounce=" +
- nodeAnnounce +
- ", last=" +
- last);
+ topology.updateMember(uniqueEventID, nodeID, newMember);
}
-
- // if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster
- // connections.
- if (nodeAnnounce)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Informing " + nodeID + " to " +
clusterConnections.toString());
- }
- for (ClusterConnection clusterConnection : clusterConnections.values())
- {
- if (log.isTraceEnabled())
- {
- log.trace(this + " information clusterConnection=" +
- clusterConnection +
- " nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- " last=" +
- last);
- }
- clusterConnection.nodeUP(nodeID, connectorPair, last);
- }
- }
}
public void flushExecutor()
@@ -347,7 +320,8 @@
executor.execute(future);
if (!future.await(10000))
{
- server.threadDump("Couldn't flush ClusterManager executor (" +
this + ") in 10 seconds, verify your thread pool size");
+ server.threadDump("Couldn't flush ClusterManager executor (" +
this +
+ ") in 10 seconds, verify your thread pool size");
}
}
@@ -405,9 +379,8 @@
TopologyMember member = topology.getMember(nodeID);
// swap backup as live and send it to everybody
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(member.getConnector().b,
-
null));
- topology.addMember(nodeID, member, false);
+ member = new TopologyMember(member.getConnector().b, null);
+ topology.updateAsLive(nodeID, member);
if (backupServerLocator != null)
{
@@ -460,7 +433,7 @@
}
}
- topology.sendMemberToListeners(nodeID, member);
+ topology.sendMember(nodeID);
}
}
@@ -496,43 +469,21 @@
this.clusterLocators.remove(serverLocator);
}
- private synchronized void announceNode()
+ private synchronized void announceNode(final TransportConfiguration nodeConnector)
{
- // TODO does this really work with more than one cluster connection? I think not
-
- // Just take the first one for now
- ClusterConnection cc = clusterConnections.values().iterator().next();
-
String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
-
- if (member == null)
+
+ TopologyMember localMember;
+ if (backup)
{
- if (backup)
- {
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(null,
-
cc.getConnector()));
- }
- else
- {
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(),
-
null));
- }
-
- topology.addMember(nodeID, member, false);
+ localMember = new TopologyMember(null, nodeConnector);
}
else
{
- if (backup)
- {
- // pair.b = cc.getConnector();
- }
- else
- {
- // pair.a = cc.getConnector();
- }
+ localMember = new TopologyMember(nodeConnector, null);
}
+
+ topology.updateAsLive(nodeID, localMember);
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
@@ -957,7 +908,11 @@
{
backupSessionFactory.getConnection()
.getChannel(0, -1)
- .send(new NodeAnnounceMessage(nodeUUID.toString(),
true, connector));
+ .send(new
NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
log.info("backup announced");
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -14,6 +14,7 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -28,5 +29,9 @@
void addClusterLocator(ServerLocatorInternal locator);
void removeClusterLocator(ServerLocatorInternal locator);
+
+ TopologyMember getLocalMember();
+
+ String getNodeId();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -354,7 +354,6 @@
if(nodeManager.isBackupLive())
{
- Thread.sleep(configuration.getFailbackDelay());
//looks like we've failed over at some point need to inform that we
are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
@@ -514,7 +513,7 @@
if (System.currentTimeMillis() - start >= timeout)
{
- log.warn("Timed out waiting for backup activation to exit");
+ threadDump("Timed out waiting for backup activation to exit");
}
nodeManager.stopBackup();
@@ -862,6 +861,10 @@
nodeManager.stop();
nodeManager = null;
+
+ addressSettingsRepository.clearListeners();
+
+ addressSettingsRepository.clearCache();
HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
@@ -2004,7 +2007,14 @@
public String toString()
{
- return "HornetQServerImpl::" + (identity == null ? "" :
(identity + ", ")) + (nodeManager != null ? ("serverUUID=" +
nodeManager.getUUID()) : "");
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
+ }
}
// Inner classes
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -83,12 +83,12 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession , FailureListener
+public class ServerSessionImpl implements ServerSession, FailureListener
{
// Constants
-----------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Static
-------------------------------------------------------------------------------
@@ -147,14 +147,14 @@
private volatile SimpleString defaultAddress;
private volatile int timeoutSeconds;
-
+
private Map<String, String> metaData;
-
+
private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not
needed to use a concurrentHashMap here
- private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
-
+ private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+
private long creationTime = System.currentTimeMillis();
// Constructors
---------------------------------------------------------------------------------
@@ -244,7 +244,6 @@
this.sessionContext = sessionContext;
}
-
public String getUsername()
{
return username;
@@ -269,8 +268,9 @@
{
return remotingConnection.getID();
}
-
- public Set<ServerConsumer> getServerConsumers() {
+
+ public Set<ServerConsumer> getServerConsumers()
+ {
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
return Collections.unmodifiableSet(consumersClone);
}
@@ -316,7 +316,7 @@
}
remotingConnection.removeFailureListener(this);
-
+
callback.closed();
}
@@ -402,7 +402,7 @@
// dies. It does not mean it will get deleted automatically when the
// session is closed.
// It is up to the user to delete the queue when finished with it
-
+
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name,
queue);
remotingConnection.addCloseListener(cleaner);
@@ -411,8 +411,7 @@
tempQueueCleannerUppers.put(name, cleaner);
}
}
-
-
+
/**
* For test cases only
* @return
@@ -427,7 +426,7 @@
private final PostOffice postOffice;
private final SimpleString bindingName;
-
+
private final Queue queue;
TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName,
final Queue queue)
@@ -435,7 +434,7 @@
this.postOffice = postOffice;
this.bindingName = bindingName;
-
+
this.queue = queue;
}
@@ -443,15 +442,15 @@
{
try
{
- if (log.isDebugEnabled())
- {
- log.debug("deleting temporary queue " + bindingName);
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("deleting temporary queue " + bindingName);
+ }
if (postOffice.getBinding(bindingName) != null)
{
postOffice.removeBinding(bindingName);
}
-
+
queue.deleteAllReferences();
}
catch (Exception e)
@@ -469,7 +468,7 @@
{
run();
}
-
+
public String toString()
{
return "Temporary Cleaner for queue " + bindingName;
@@ -489,11 +488,11 @@
server.destroyQueue(name, this);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
-
+
if (cleaner != null)
{
remotingConnection.removeCloseListener(cleaner);
-
+
remotingConnection.removeFailureListener(cleaner);
}
}
@@ -576,8 +575,8 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
- consumer.acknowledge(autoCommitAcks, tx, messageID);
+
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
}
public void individualAcknowledge(final long consumerID, final long messageID) throws
Exception
@@ -935,7 +934,7 @@
throw new HornetQXAException(XAException.XAER_PROTO,
"Cannot prepare transaction, it is
suspended " + xid);
}
- else if(theTx.getState() == Transaction.State.PREPARED)
+ else if (theTx.getState() == Transaction.State.PREPARED)
{
log.info("ignoring prepare on xid as already called :" + xid);
}
@@ -966,7 +965,7 @@
public void xaSetTimeout(final int timeout)
{
timeoutSeconds = timeout;
- if(tx != null)
+ if (tx != null)
{
tx.setTimeout(timeout);
}
@@ -981,18 +980,18 @@
{
setStarted(false);
}
-
+
public void waitContextCompletion()
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
try
{
if (!storageManager.waitOnOperations(10000))
{
- log.warn("Couldn't finish context execution in 10 seconds",
new Exception ("warning"));
+ log.warn("Couldn't finish context execution in 10 seconds",
new Exception("warning"));
}
}
catch (Exception e)
@@ -1009,7 +1008,7 @@
public void close(final boolean failed)
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
storageManager.setContext(sessionContext);
@@ -1019,7 +1018,7 @@
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
try
@@ -1071,9 +1070,9 @@
{
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
-
+
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with
ID=" + currentLargeMessage.getMessageID());
@@ -1085,7 +1084,7 @@
public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
-
+
SimpleString address = message.getAddress();
message.setMessageID(id);
@@ -1111,7 +1110,6 @@
log.trace("send(message=" + message + ", direct=" + direct +
") being called");
}
-
if (message.getAddress().equals(managementAddress))
{
// It's a management message
@@ -1129,7 +1127,10 @@
}
}
- public void sendContinuations(final int packetSize, final long messageBodySize, final
byte[] body, final boolean continues) throws Exception
+ public void sendContinuations(final int packetSize,
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -1144,7 +1145,7 @@
if (!continues)
{
currentLargeMessage.releaseResources();
-
+
if (messageBodySize >= 0)
{
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE,
messageBodySize);
@@ -1178,7 +1179,6 @@
consumer.setTransferring(transferring);
}
}
-
public void addMetaData(String key, String data)
{
@@ -1198,7 +1198,7 @@
}
return data;
}
-
+
public String[] getTargetAddresses()
{
Map<SimpleString, Pair<UUID, AtomicLong>> copy =
cloneTargetAddresses();
@@ -1238,7 +1238,7 @@
public void describeProducersInfo(JSONArray array) throws Exception
{
Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy =
cloneTargetAddresses();
-
+
for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry :
targetCopy.entrySet())
{
JSONObject producerInfo = new JSONObject();
@@ -1251,7 +1251,6 @@
}
}
-
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1271,7 +1270,6 @@
}
}
-
// Public
// ----------------------------------------------------------------------------
@@ -1337,7 +1335,7 @@
toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
-
+
for (MessageReference ref : toCancel)
{
ref.getQueue().cancel(theTx, ref);
@@ -1379,12 +1377,12 @@
}
postOffice.route(msg, routingContext, direct);
-
+
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-
+
if (value == null)
{
- targetAddressInfos.put(msg.getAddress(), new
Pair<UUID,AtomicLong>(msg.getUserID(), new AtomicLong(1)));
+ targetAddressInfos.put(msg.getAddress(), new Pair<UUID,
AtomicLong>(msg.getUserID(), new AtomicLong(1)));
}
else
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -304,11 +304,14 @@
public void unregisterAcceptors()
{
List<String> acceptors = new ArrayList<String>();
- for (String resourceName : registry.keySet())
+ synchronized (this)
{
- if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
+ for (String resourceName : registry.keySet())
{
- acceptors.add(resourceName);
+ if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
+ {
+ acceptors.add(resourceName);
+ }
}
}
@@ -508,7 +511,7 @@
registry.put(resourceName, managedResource);
}
- public void unregisterFromRegistry(final String resourceName)
+ public synchronized void unregisterFromRegistry(final String resourceName)
{
registry.remove(resourceName);
}
@@ -618,7 +621,29 @@
messageCounterManager.clear();
}
+
+ listeners.clear();
+
+ registry.clear();
+ messagingServer = null;
+
+ securityRepository = null;
+
+ addressSettingsRepository = null;
+
+ messagingServerControl = null;
+
+ messageCounterManager = null;
+
+ postOffice = null;
+
+ pagingManager = null;
+
+ storageManager = null;
+
+ messagingServer = null;
+
registeredNames.clear();
started = false;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -65,6 +65,8 @@
*/
void clear();
+ void clearListeners();
+
void clearCache();
int getCacheSize();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -221,6 +221,11 @@
matches.clear();
}
+ public void clearListeners()
+ {
+ listeners.clear();
+ }
+
public void clearCache()
{
cache.clear();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -40,37 +40,49 @@
public void testMessageCounter() throws Exception
{
- Connection conn = cf.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = createQueue(true, "Test");
-
- MessageProducer producer = sess.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
+ try
{
- TextMessage mess = sess.createTextMessage("msg" + i);
- producer.send(mess);
+ Connection conn = cf.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = createQueue(true, "Test");
+
+ MessageProducer producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage mess = sess.createTextMessage("msg" + i);
+ producer.send(mess);
+ }
+
+ conn.close();
+
+ JMSQueueControl control =
(JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE +
queue.getQueueName());
+ assertNotNull(control);
+
+ System.out.println(control.listMessageCounterAsHTML());
+
+ jmsServer.stop();
+
+ restartServer();
+
+ control =
(JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE +
queue.getQueueName());
+ assertNotNull(control);
+
+ System.out.println(control.listMessageCounterAsHTML());
}
-
- conn.close();
-
- JMSQueueControl control =
(JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE +
queue.getQueueName());
- assertNotNull(control);
-
- System.out.println(control.listMessageCounterAsHTML());
-
- jmsServer.stop();
-
- restartServer();
-
- control =
(JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE +
queue.getQueueName());
- assertNotNull(control);
-
- System.out.println(control.listMessageCounterAsHTML());
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ jmsServer.stop();
+ }
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -217,137 +217,144 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(producerWindowSize);
- locator.setConsumerWindowSize(consumerWindowSize);
- locator.setAckBatchSize(ackBatchSize);
-
- if (minLargeMessageSize != -1)
+ try
{
- locator.setMinLargeMessageSize(minLargeMessageSize);
- }
- ClientSessionFactory sf = locator.createSessionFactory();
- ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(producerWindowSize);
+ locator.setConsumerWindowSize(consumerWindowSize);
+ locator.setAckBatchSize(ackBatchSize);
- session.start();
+ if (minLargeMessageSize != -1)
+ {
+ locator.setMinLargeMessageSize(minLargeMessageSize);
+ }
- final String queueName = "testqueue";
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i < numConsumers; i++)
- {
- session.createQueue(address, new SimpleString(queueName + i), null, false);
- }
+ session.start();
- final byte[] bytes = RandomUtil.randomBytes(messageSize);
+ final String queueName = "testqueue";
- class MyHandler implements MessageHandler
- {
- int count = 0;
+ for (int i = 0; i < numConsumers; i++)
+ {
+ session.createQueue(address, new SimpleString(queueName + i), null, false);
+ }
- final CountDownLatch latch = new CountDownLatch(1);
+ final byte[] bytes = RandomUtil.randomBytes(messageSize);
- volatile Exception exception;
-
- public void onMessage(final ClientMessage message)
+ class MyHandler implements MessageHandler
{
- try
- {
- byte[] bytesRead = new byte[messageSize];
+ int count = 0;
- message.getBodyBuffer().readBytes(bytesRead);
+ final CountDownLatch latch = new CountDownLatch(1);
- UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
+ volatile Exception exception;
- message.acknowledge();
-
- if (++count == numMessages * numProducers)
+ public void onMessage(final ClientMessage message)
+ {
+ try
{
- latch.countDown();
- }
+ byte[] bytesRead = new byte[messageSize];
- if (consumerDelay > 0)
- {
- Thread.sleep(consumerDelay);
- }
+ message.getBodyBuffer().readBytes(bytesRead);
- }
- catch (Exception e)
- {
- ProducerFlowControlTest.log.error("Failed to handle message",
e);
+ UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
- exception = e;
+ message.acknowledge();
- latch.countDown();
- }
- }
- }
+ if (++count == numMessages * numProducers)
+ {
+ latch.countDown();
+ }
- MyHandler[] handlers = new MyHandler[numConsumers];
+ if (consumerDelay > 0)
+ {
+ Thread.sleep(consumerDelay);
+ }
- for (int i = 0; i < numConsumers; i++)
- {
- handlers[i] = new MyHandler();
+ }
+ catch (Exception e)
+ {
+ ProducerFlowControlTest.log.error("Failed to handle message",
e);
- ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
+ exception = e;
- consumer.setMessageHandler(handlers[i]);
- }
+ latch.countDown();
+ }
+ }
+ }
- ClientProducer[] producers = new ClientProducer[numProducers];
+ MyHandler[] handlers = new MyHandler[numConsumers];
- for (int i = 0; i < numProducers; i++)
- {
- if (anon)
+ for (int i = 0; i < numConsumers; i++)
{
- producers[i] = session.createProducer();
- }
- else
- {
- producers[i] = session.createProducer(address);
- }
- }
+ handlers[i] = new MyHandler();
- long start = System.currentTimeMillis();
+ ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ consumer.setMessageHandler(handlers[i]);
+ }
- message.getBodyBuffer().writeBytes(bytes);
+ ClientProducer[] producers = new ClientProducer[numProducers];
- for (int j = 0; j < numProducers; j++)
+ for (int i = 0; i < numProducers; i++)
{
if (anon)
{
- producers[j].send(address, message);
+ producers[i] = session.createProducer();
}
else
{
- producers[j].send(message);
+ producers[i] = session.createProducer(address);
}
+ }
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ for (int j = 0; j < numProducers; j++)
+ {
+ if (anon)
+ {
+ producers[j].send(address, message);
+ }
+ else
+ {
+ producers[j].send(message);
+ }
+
+ }
}
- }
- for (int i = 0; i < numConsumers; i++)
- {
- assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
+ for (int i = 0; i < numConsumers; i++)
+ {
+ Assert.assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
- Assert.assertNull(handlers[i].exception);
- }
+ Assert.assertNull(handlers[i].exception);
+ }
- long end = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
- double rate = 1000 * (double)numMessages / (end - start);
+ double rate = 1000 * (double)numMessages / (end - start);
- ProducerFlowControlTest.log.info("rate is " + rate + " msgs /
sec");
+ ProducerFlowControlTest.log.info("rate is " + rate + " msgs /
sec");
- session.close();
+ session.close();
- sf.close();
-
- server.stop();
+ sf.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testClosingSessionUnblocksBlockedProducer() throws Exception
@@ -364,56 +371,63 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
- final ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final SimpleString queueName = new SimpleString("testqueue");
+ ClientSessionFactory sf = locator.createSessionFactory();
+ final ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(address, queueName, null, false);
+ final SimpleString queueName = new SimpleString("testqueue");
- ClientProducer producer = session.createProducer(address);
+ session.createQueue(address, queueName, null, false);
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer(address);
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final AtomicBoolean closed = new AtomicBoolean(false);
+ message.getBodyBuffer().writeBytes(bytes);
- Thread t = new Thread(new Runnable()
- {
- public void run()
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ Thread t = new Thread(new Runnable()
{
- try
+ public void run()
{
- Thread.sleep(500);
+ try
+ {
+ Thread.sleep(500);
- closed.set(true);
+ closed.set(true);
- session.close();
+ session.close();
+ }
+ catch (Exception e)
+ {
+ }
}
- catch (Exception e)
- {
- }
- }
- });
+ });
- t.start();
+ t.start();
- // This will block
- producer.send(message);
+ // This will block
+ producer.send(message);
- Assert.assertTrue(closed.get());
+ Assert.assertTrue(closed.get());
- t.join();
-
- server.stop();
+ t.join();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testFlowControlMessageNotRouted() throws Exception
@@ -430,33 +444,40 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientProducer producer = session.createProducer(address);
+ final ClientSession session = sf.createSession(false, true, true, true);
- byte[] bytes = new byte[100];
+ ClientProducer producer = session.createProducer(address);
- final int numMessages = 1000;
+ byte[] bytes = new byte[100];
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ final int numMessages = 1000;
- message.getBodyBuffer().writeBytes(bytes);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
- producer.send(message);
- }
+ message.getBodyBuffer().writeBytes(bytes);
- session.close();
+ producer.send(message);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
// Not technically a flow control test, but what the hell
@@ -465,66 +486,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
- session.createQueue("address", "queue2", null, false);
- session.createQueue("address", "queue3", null, false);
- session.createQueue("address", "queue4", null, false);
- session.createQueue("address", "queue5", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientConsumer consumer1 = session.createConsumer("queue1");
- ClientConsumer consumer2 = session.createConsumer("queue2");
- ClientConsumer consumer3 = session.createConsumer("queue3");
- ClientConsumer consumer4 = session.createConsumer("queue4");
- ClientConsumer consumer5 = session.createConsumer("queue5");
+ session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue2", null, false);
+ session.createQueue("address", "queue3", null, false);
+ session.createQueue("address", "queue4", null, false);
+ session.createQueue("address", "queue5", null, false);
- ClientProducer producer = session.createProducer("address");
+ ClientConsumer consumer1 = session.createConsumer("queue1");
+ ClientConsumer consumer2 = session.createConsumer("queue2");
+ ClientConsumer consumer3 = session.createConsumer("queue3");
+ ClientConsumer consumer4 = session.createConsumer("queue4");
+ ClientConsumer consumer5 = session.createConsumer("queue5");
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer("address");
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final int numMessages = 1000;
+ message.getBodyBuffer().writeBytes(bytes);
- for (int i = 0; i < numMessages; i++)
- {
- producer.send(message);
- }
+ final int numMessages = 1000;
- session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage msg = consumer1.receive(1000);
+ session.start();
- Assert.assertNotNull(msg);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = consumer1.receive(1000);
- msg = consumer2.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer2.receive(5000);
- msg = consumer3.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer3.receive(5000);
- msg = consumer4.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer4.receive(5000);
- msg = consumer5.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
- }
+ msg = consumer5.receive(5000);
- session.close();
+ Assert.assertNotNull(msg);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching1() throws Exception
@@ -532,35 +560,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching2() throws Exception
@@ -568,37 +604,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching3() throws Exception
@@ -606,35 +650,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching4() throws Exception
@@ -642,37 +694,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching5() throws Exception
@@ -680,63 +740,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- List<ClientProducerCredits> creditsList = new
ArrayList<ClientProducerCredits>();
+ ClientProducerCredits credits = null;
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ List<ClientProducerCredits> creditsList = new
ArrayList<ClientProducerCredits>();
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- if (credits != null)
- {
- assertFalse(newCredits == credits);
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+
+ creditsList.add(credits);
}
- credits = newCredits;
+ Iterator<ClientProducerCredits> iter = creditsList.iterator();
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- creditsList.add(credits);
- }
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- Iterator<ClientProducerCredits> iter = creditsList.iterator();
+ Assert.assertTrue(newCredits == iter.next());
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + (i +
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
- assertTrue(newCredits == iter.next());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE +
i + 1,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.close();
}
-
- for (int i = 0; i < 10; i++)
+ finally
{
- ClientProducer prod = session.createProducer("address" + (i +
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
-
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE
+ i + 1,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ server.stop();
}
-
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching6() throws Exception
@@ -744,26 +814,35 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address", session.createMessage(false));
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod.send("address", session.createMessage(false));
+
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
}
+ finally
+ {
+ server.stop();
+ }
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching7() throws Exception
@@ -771,50 +850,58 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- prod.send("address2-" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address2-" + i, session.createMessage(false));
- session.close();
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsRefCounting() throws Exception
@@ -822,43 +909,50 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducer prod1 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.createQueue("address", "queue1", null, false);
- ClientProducer prod2 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod1 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- ClientProducer prod3 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod2 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- prod1.close();
+ ClientProducer prod3 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod1.close();
- prod2.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod2.close();
- prod3.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod3.close();
- session.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -444,6 +444,7 @@
// Now we will simulate a failure of the bridge connection between server0 and
server1
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+ assertNotNull(bridge);
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = reconnectAttempts - 1;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -20,10 +20,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -132,7 +137,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
+
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -303,6 +311,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -330,6 +339,8 @@
Thread.sleep(1000);
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -395,6 +406,7 @@
BridgeStartTest.log.info("sent some more messages");
server1.start();
+ waitForServer(server1);
BridgeStartTest.log.info("started server1");
@@ -514,6 +526,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -542,6 +555,8 @@
// JMSBridge should be stopped since retries = 0
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -665,8 +680,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -434,7 +434,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(2000);
Assert.assertNotNull(message);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -73,7 +73,7 @@
*/
public abstract class ClusterTestBase extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ private final Logger log = Logger.getLogger(this.getClass());
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
@@ -88,12 +88,14 @@
private static final long WAIT_TIMEOUT = 10000;
- private static final long TIMEOUT_START_SERVER = 500;
+ private static final long TIMEOUT_START_SERVER = 10;
@Override
protected void setUp() throws Exception
{
super.setUp();
+
+ forceGC();
UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
@@ -116,9 +118,6 @@
locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
- // To make sure the test will start with a clean VM
- forceGC();
-
}
@Override
@@ -247,7 +246,7 @@
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
String msg = "Timed out waiting for server starting = " + node;
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new IllegalStateException(msg);
}
@@ -283,7 +282,7 @@
topology +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new Exception(msg);
}
@@ -359,7 +358,7 @@
")" +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
@@ -772,7 +771,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -873,7 +872,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -920,7 +919,7 @@
{
if (consumers[i] != null && !consumers[i].consumer.isClosed())
{
- ClusterTestBase.log.info("Dumping consumer " + i);
+ log.info("Dumping consumer " + i);
checkReceive(i);
}
@@ -984,13 +983,13 @@
if (message != null)
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID
+
+ log.info("check receive Consumer " + consumerID +
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
}
else
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID
+ " null message");
+ log.info("check receive Consumer " + consumerID + " null
message");
}
}
while (message != null);
@@ -2023,20 +2022,19 @@
for (int node : nodes)
{
log.info("#test start node " + node);
-// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-// {
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
- Thread.sleep(TIMEOUT_START_SERVER);
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+ {
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
timeStarts[node] = System.currentTimeMillis();
servers[node].setIdentity("server " + node);
- ClusterTestBase.log.info("starting server " + servers[node]);
+ log.info("starting server " + servers[node]);
servers[node].start();
- ClusterTestBase.log.info("started server " + servers[node]);
+ log.info("started server " + servers[node]);
- ClusterTestBase.log.info("started server " + node);
+ log.info("started server " + node);
waitForServer(servers[node]);
@@ -2053,6 +2051,7 @@
for (ClusterConnection cc :
servers[node].getClusterManager().getClusterConnections())
{
cc.stop();
+ cc.flushExecutor();
}
}
}
@@ -2068,23 +2067,22 @@
{
try
{
-// if (System.currentTimeMillis() - timeStarts[node] <
TIMEOUT_START_SERVER)
-// {
-// // We can't stop and start a node too fast (faster than what the
Topology could realize about this
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
+ if (System.currentTimeMillis() - timeStarts[node] <
TIMEOUT_START_SERVER)
+ {
+ // We can't stop and start a node too fast (faster than what the
Topology could realize about this
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
- Thread.sleep(TIMEOUT_START_SERVER);
timeStarts[node] = System.currentTimeMillis();
- ClusterTestBase.log.info("stopping server " + node);
+ log.info("stopping server " + node);
servers[node].stop();
- ClusterTestBase.log.info("server " + node + "
stopped");
+ log.info("server " + node + " stopped");
}
catch (Exception e)
{
- ClusterTestBase.log.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
}
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -19,10 +19,8 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -54,6 +54,11 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i < 5; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -97,6 +102,11 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i < 5; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -88,13 +88,6 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
- Thread.sleep(2000);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
- System.out.println(clusterDescription(servers[2]));
- System.out.println(clusterDescription(servers[3]));
- System.out.println(clusterDescription(servers[4]));
-
waitForBindings(0, "queues.testaddress", 1, 1, false);
send(0, "queues.testaddress", 10, false, null);
@@ -323,8 +316,9 @@
stopServers(2);
- Thread.sleep(2000);
+ waitForTopology(servers[1], 4);
+ Thread.sleep(1000);
log.info("============================================ after stop");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -332,9 +326,12 @@
log.info(clusterDescription(servers[4]));
startServers(2);
+
- Thread.sleep(2000);
+ Thread.sleep(1000);
+ waitForTopology(servers[1], 5);
+
log.info("============================================ after start");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -358,7 +355,6 @@
setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4,
isNetty(), true);
startServers(0, 1, 2, 3, 4);
- Thread.sleep(2000);
Set<ClusterConnection> connectionSet =
getServer(0).getClusterManager().getClusterConnections();
assertNotNull(connectionSet);
assertEquals(1, connectionSet.size());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -72,7 +72,7 @@
startServers(0);
// Give it a little time for the bridge to try to start
- Thread.sleep(2000);
+ Thread.sleep(500);
stopServers(0);
}
@@ -102,7 +102,11 @@
public void testStartSourceServerBeforeTargetServer() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -124,6 +128,13 @@
public void testStopAndStartTarget() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -150,12 +161,14 @@
OnewayTwoNodeClusterTest.log.info("stopping server 1");
stopServers(1);
+
+ waitForTopology(servers[0], 1);
OnewayTwoNodeClusterTest.log.info("restarting server 1(" +
servers[1].getIdentity() + ")");
startServers(1);
- //Thread.sleep(1000);
+ waitForTopology(servers[0], 2);
log.info("Server 1 id=" + servers[1].getNodeID());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -148,17 +148,22 @@
setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
startServers(0, 1, 2);
+
+ waitForTopology(servers[0], 3);
+ waitForTopology(servers[1], 3);
+ waitForTopology(servers[2], 3);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ System.out.println("top[" + i + "]=" +
servers[i].getClusterManager().getTopology().describe());
+ }
- for (int i = 0; i < 10; i++)
- log.info("****************************");
for (int i = 0; i <= 2; i++)
{
log.info("*************************************\n " + servers[i] +
" topology:\n" +
servers[i].getClusterManager().getTopology().describe());
}
- for (int i = 0; i < 10; i++)
- log.info("****************************");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -180,19 +185,52 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
+
+ public void testSimple_TwoNodes() throws Exception
+ {
+ setupServer(0, false, isNetty());
+ setupServer(1, false, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ for (int i = 0; i <= 1; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
+ }
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ closeAllConsumers();
+
+ }
+
static int loopNumber;
public void _testLoop() throws Throwable
{
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 10; i++)
{
loopNumber = i;
log.info("#test " + i);
- testSimple2();
- if (i + 1 < 1000)
- {
- tearDown();
- setUp();
- }
+ testSimple();
+ tearDown();
+ setUp();
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -1085,9 +1085,14 @@
public void testRouteWhenNoConsumersFalseNoLocalConsumerLoadBalancedQueues() throws
Exception
{
setupCluster(false);
-
+
startServers();
+ for (int i = 0 ; i <= 4; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -1241,10 +1246,6 @@
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
- // this.checkReceive(0, 1, 2, 3, 4);
-
- // Thread.sleep(300000);
-
verifyReceiveAll(10, 0, 1, 2, 3, 4);
}
@@ -1470,7 +1471,6 @@
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
- Thread.sleep(2000);
System.out.println("#####################################");
System.out.println(clusterDescription(servers[0]));
System.out.println(clusterDescription(servers[1]));
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -128,7 +128,7 @@
stopServers(0, 1);
}
- public void testRestartTest() throws Throwable
+ public void testRestartServers() throws Throwable
{
String name = Thread.currentThread().getName();
try
@@ -136,14 +136,19 @@
Thread.currentThread().setName("ThreadOnTestRestartTest");
startServers(0, 1);
waitForTopology(servers[0], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < 10; i++)
{
+ Thread.sleep(10);
log.info("Sleep #test " + i);
log.info("#stop #test #" + i);
- Thread.sleep(500);
stopServers(1);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
@@ -182,17 +187,17 @@
verifyNotReceive(0, 1);
removeConsumer(1);
-
+
closeSessionFactory(1);
-
+
stopServers(1);
-
+
Thread.sleep(12000);
System.out.println(clusterDescription(servers[0]));
startServers(1);
-
+
Thread.sleep(3000);
System.out.println(clusterDescription(servers[0]));
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -103,11 +103,14 @@
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
liveServer.start();
+
+ waitForServer(liveServer.getServer());
if (backupServer != null)
{
backupServer.setIdentity(this.getClass().getSimpleName() +
"/backupServer");
backupServer.start();
+ waitForServer(backupServer.getServer());
}
}
@@ -433,7 +436,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
{
@@ -447,7 +450,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
//To change body of implemented methods use File | Settings | File Templates.
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -34,6 +33,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -47,6 +48,7 @@
*/
public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
{
+ Logger log = Logger.getLogger(this.getClass());
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -102,7 +104,7 @@
}
}
}
-
+
try
{
Thread.sleep(100);
@@ -170,6 +172,13 @@
protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
int
topologyMembers) throws Exception
{
+ return createSessionFactoryAndWaitForTopology(locator, topologyMembers, null);
+ }
+
+ protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int
topologyMembers,
+
HornetQServer server) throws Exception
+ {
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
@@ -179,12 +188,15 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ locator.removeClusterTopologyListener(topListener);
if (!ok)
{
- System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ if (server != null)
+ {
+ log.info("failed topology, Topology on server = " +
server.getClusterManager().getTopology().describe());
+ }
}
- locator.removeClusterTopologyListener(topListener);
- assertTrue(ok);
+ assertTrue("expected " + topologyMembers + " members", ok);
return sf;
}
@@ -219,7 +231,10 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
{
@@ -233,7 +248,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -25,7 +25,6 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -42,7 +41,7 @@
{
for (TestableServer testableServer : servers.values())
{
- if(testableServer != null)
+ if (testableServer != null)
{
try
{
@@ -56,43 +55,51 @@
}
super.tearDown();
}
-
+
public void testMultipleFailovers2LiveServers() throws Exception
{
- // TODO: remove these sleeps
NodeManager nodeManager1 = new InVMNodeManager();
NodeManager nodeManager2 = new InVMNodeManager();
createLiveConfig(nodeManager1, 0, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 1, true, new int[] { 0, 2 }, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 2, true, new int[] { 0, 1 }, 3, 4, 5);
createLiveConfig(nodeManager2, 3, 0);
- createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
- createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
-
- Thread.sleep(500);
+ createBackupConfig(nodeManager2, 3, 4, true, new int[] { 3, 5 }, 0, 1, 2);
+ createBackupConfig(nodeManager2, 3, 5, true, new int[] { 3, 4 }, 0, 1, 2);
+
servers.get(0).start();
- Thread.sleep(500);
+ waitForServer(servers.get(0).getServer());
+
servers.get(3).start();
- Thread.sleep(500);
+ waitForServer(servers.get(3).getServer());
+
servers.get(1).start();
- Thread.sleep(500);
+ waitForServer(servers.get(1).getServer());
+
servers.get(2).start();
- Thread.sleep(500);
+
servers.get(4).start();
- Thread.sleep(500);
+ waitForServer(servers.get(4).getServer());
+
servers.get(5).start();
+
+ waitForServer(servers.get(4).getServer());
+
ServerLocator locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4, servers.get(0).getServer());
ClientSession session = sendAndConsume(sf, true);
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ Thread.sleep(500);
servers.get(0).crash(session);
+ System.out.println("server3 " +
servers.get(3).getServer().getClusterManager().getTopology().describe());
+
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
@@ -139,11 +146,18 @@
}
}
- protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid,
boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+ protected void createBackupConfig(NodeManager nodeManager,
+ int liveNode,
+ int nodeid,
+ boolean createClusterConnections,
+ int[] otherBackupNodes,
+ int... otherClusterNodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(nodeid, isNetty())));
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+
generateParams(nodeid, isNetty())));
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
@@ -152,21 +166,36 @@
List<String> staticConnectors = new ArrayList<String>();
for (int node : otherBackupNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
staticConnectors.add(liveConnector.getName());
}
- TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
false, generateParams(nodeid, isNetty()));
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(nodeid, isNetty()));
config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
List<String> clusterNodes = new ArrayList<String>();
for (int node : otherClusterNodes)
{
- TransportConfiguration connector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ TransportConfiguration connector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(connector.getName(), connector);
clusterNodes.add(connector.getName());
}
- ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1, clusterNodes, false);
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1",
+
"jms",
+
backupConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+
clusterNodes,
+ false);
config1.getClusterConfigurations().add(ccc1);
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
@@ -177,25 +206,39 @@
servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true,
config1, nodeManager, liveNode)));
}
- protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ...
otherLiveNodes)
+ protected void createLiveConfig(NodeManager nodeManager, int liveNode, int...
otherLiveNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(liveNode, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(liveNode, isNetty())));
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+
generateParams(liveNode, isNetty())));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
List<String> pairs = new ArrayList<String>();
for (int node : otherLiveNodes)
{
- TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
- pairs.add(otherLiveConnector.getName());
+ pairs.add(otherLiveConnector.getName());
}
- ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1",
+
"jms",
+
liveConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ pairs,
+ false);
config0.getClusterConfigurations().add(ccc0);
config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -204,7 +247,8 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true,
config0, nodeManager, liveNode)));
+ servers.put(liveNode,
+ new SameProcessHornetQServer(createInVMFailoverServer(true, config0,
nodeManager, liveNode)));
}
protected boolean isNetty()
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -204,7 +204,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
@@ -222,7 +223,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -278,7 +279,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
@@ -289,7 +291,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -350,7 +352,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
@@ -361,7 +363,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -432,7 +434,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
@@ -443,7 +445,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -12,9 +12,6 @@
*/
package org.hornetq.tests.integration.jms.bridge;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
@@ -31,6 +28,11 @@
*/
public class JMSBridgeReconnectionTest extends BridgeTestBase
{
+ /**
+ *
+ */
+ private static final int TIME_WAIT = 5000;
+
private static final Logger log = Logger.getLogger(JMSBridgeReconnectionTest.class);
// Crash and reconnect
@@ -175,8 +177,6 @@
bridge.stop();
Assert.assertFalse(bridge.isStarted());
-
- // Thread.sleep(3000);
// we restart and setup the server for the test's tearDown checks
jmsServer1.start();
@@ -245,7 +245,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server
back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
@@ -337,7 +337,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server
back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -296,7 +296,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
@@ -312,7 +312,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
// To change body of implemented methods use File | Settings | File Templates.
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -2,6 +2,7 @@
import junit.framework.Assert;
+import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.hornetq.tests.util.UnitTestCase;
@@ -15,6 +16,16 @@
*/
public class SpringIntegrationTest extends UnitTestCase
{
+ Logger log = Logger.getLogger(SpringIntegrationTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ // Need to force GC as the connection on the spring needs to be cleared
+ // otherwise the sprint thread may leak here
+ forceGC();
+ }
+
public void testSpring() throws Exception
{
System.out.println("Creating bean factory...");
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -113,9 +113,11 @@
jmsServer1.start();
jmsServer1.activated();
+ waitForServer(jmsServer1.getHornetQServer());
jmsServer2.start();
jmsServer2.activated();
+ waitForServer(jmsServer2.getHornetQServer());
cf1 = (ConnectionFactory)
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new
TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(0)));
@@ -228,6 +230,8 @@
log.warn("Can't stop server2", e);
}
+ Thread.sleep(500);
+
((HornetQConnectionFactory)cf1).close();
((HornetQConnectionFactory)cf2).close();
@@ -250,9 +254,9 @@
}
catch (Throwable e)
{
- log.warn("Can't stop server2", e);
+ log.warn("Can't stop server1", e);
}
-
+
server1 = null;
jmsServer1 = null;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -122,6 +122,7 @@
Configuration conf = createDefaultConfig(false);
+ conf.getAcceptorConfigurations().clear();
conf.getAcceptorConfigurations().add(new
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
conf.getConnectorConfigurations().put("invm", new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
@@ -181,9 +182,6 @@
mbeanServer = null;
super.tearDown();
-
-
- super.tearDown();
}
// Private -------------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-09
18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-09
20:31:28 UTC (rev 11314)
@@ -571,7 +571,7 @@
}
catch (Exception e)
{
- throw new IllegalStateException("port " + port + " is already
bound");
+ throw new IllegalStateException("port " + port + " is
bound");
}
finally
{
@@ -967,6 +967,8 @@
logAndSystemOut("Thread leaked on test " + this.getClass().getName() +
"::" +
this.getName() + "\n" + buffer.toString());
logAndSystemOut("Thread leakage");
+
+ fail("Thread leaked");
}
super.tearDown();