JBoss hornetq SVN: r11236 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 15:59:41 -0400 (Mon, 29 Aug 2011)
New Revision: 11236
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-29 19:49:15 UTC (rev 11235)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-29 19:59:41 UTC (rev 11236)
@@ -504,7 +504,7 @@
serverLocator.start(server.getExecutorFactory().getExecutor());
- serverLocator.getExecutor().execute(new Runnable(){
+ /* serverLocator.getExecutor().execute(new Runnable(){
public void run()
{
try
@@ -522,7 +522,7 @@
log.warn("Error on connectin Cluster connection to other nodes", e);
}
}
- });
+ });*/
}
if (managementService != null)
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-29 19:49:15 UTC (rev 11235)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-29 19:59:41 UTC (rev 11236)
@@ -202,15 +202,10 @@
if (connectorName == null)
{
connectorName = config.getConnectorName();
+ break;
}
- else if (!connectorName.equals(config.getConnectorName()))
- {
- throw new IllegalStateException("Using multiple connector names on cluster connections it's "
- + "not supported at this time");
- }
+ }
- }
-
if (connectorName != null)
{
TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
13 years, 3 months
JBoss hornetq SVN: r11235 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 15:49:15 -0400 (Mon, 29 Aug 2011)
New Revision: 11235
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
test for my branch only
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-29 19:49:15 UTC (rev 11235)
@@ -528,6 +528,24 @@
initialise();
this.startExecutor = executor;
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
}
public Executor getExecutor()
13 years, 3 months
JBoss hornetq SVN: r11234 - in branches/Branch_2_2_EAP_cluster_clean3: src/main/org/hornetq/api/core/client and 12 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 12:20:36 -0400 (Mon, 29 Aug 2011)
New Revision: 11234
Added:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
Changes I'm planning
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties 2011-08-29 16:20:36 UTC (rev 11234)
@@ -2,8 +2,8 @@
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=8
-hornetq.version.incrementingVersion=121
+hornetq.version.incrementingVersion=122
hornetq.version.versionSuffix=CR1
hornetq.version.versionTag=CR1
hornetq.netty.version=(a)NETTY.VERSION@
-hornetq.version.compatibleVersionList=121
+hornetq.version.compatibleVersionList=121,122
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
- void nodeDown(String nodeID);
+ void nodeDown(long eventUID, String nodeID);
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -43,12 +43,13 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
-import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -1291,9 +1292,13 @@
ClientSessionFactoryImpl.log.trace(this + "::Subscribing Topology");
}
- channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
+ channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
+
+
if (serverLocator.isClusterConnection())
{
+
+ new Exception("Announcing node::").printStackTrace();
TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
if (ClientSessionFactoryImpl.isDebug)
{
@@ -1301,7 +1306,8 @@
", isBackup=" +
serverLocator.isBackup());
}
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+ sendNodeAnnounce(System.currentTimeMillis(), serverLocator.getNodeID(), serverLocator.isBackup(), config, null);
+ //channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
}
}
}
@@ -1314,6 +1320,23 @@
return connection;
}
+ /**
+ * @param channel0
+ */
+ public void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig)
+ {
+ Channel channel0 = connection.getChannel(0, -1);
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
+ ", isBackup=" + isBackup);
+ }
+ ClientSessionFactoryImpl.log.info("YYY Announcing node " + serverLocator.getNodeID() + ", config=" + config +
+ ", backup=" + backupConfig +
+ ", isBackup=" + isBackup);
+ channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config, backupConfig));
+ }
+
@Override
public void finalize() throws Throwable
{
@@ -1439,7 +1462,7 @@
if (nodeID != null)
{
- serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
}
closeExecutor.execute(new Runnable()
@@ -1456,6 +1479,8 @@
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY)
{
+ log.warn("Server is sending packets from an older version. " +
+ "You must update all the servers to the same version on a cluster!");
ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage)packet;
if (topMessage.isExit())
@@ -1464,7 +1489,7 @@
{
ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
}
- serverLocator.notifyNodeDown(topMessage.getNodeID());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(), topMessage.getNodeID());
}
else
{
@@ -1478,9 +1503,36 @@
" csf created at\nserverLocator=" +
serverLocator, e);
}
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(System.currentTimeMillis(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+ {
+ ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2)packet;
+
+ if (topMessage.isExit())
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
+ }
+ serverLocator.notifyNodeDown(topMessage.getUniqueEventID(), topMessage.getNodeID());
+ }
+ else
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Node " + topMessage.getNodeID() +
+ " going up, connector = " +
+ topMessage.getPair() +
+ ", isLast=" +
+ topMessage.isLast() +
+ " csf created at\nserverLocator=" +
+ serverLocator, e);
+ }
+ serverLocator.notifyNodeUp(topMessage.getUniqueEventID(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ }
+ }
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -42,6 +42,8 @@
void removeSession(final ClientSessionInternal session, boolean failingOver);
void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
+
+ void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig);
TransportConfiguration getConnectorConfiguration();
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -483,7 +483,9 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
{
this(topology, useHA, groupConfiguration, null);
@@ -526,25 +528,12 @@
initialise();
this.startExecutor = executor;
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- connect();
- }
- catch (Exception e)
- {
- if (!closing)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
}
+
+ public Executor getExecutor()
+ {
+ return startExecutor;
+ }
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
@@ -554,7 +543,7 @@
finalizeCheck = false;
}
- public ClientSessionFactory connect() throws Exception
+ public ClientSessionFactoryInternal connect() throws Exception
{
ClientSessionFactoryInternal sf;
// static list of initial connectors
@@ -1244,7 +1233,10 @@
closed = true;
}
- public void notifyNodeDown(final String nodeID)
+ /** This is directly called when the connection to the node is gone,
+ * or when the node sends a disconnection.
+ * Look for callers of this method! */
+ public void notifyNodeDown(final long eventTime, final String nodeID)
{
if (topology == null)
@@ -1258,27 +1250,31 @@
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
- topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
+ if (topology.removeMember(eventTime, nodeID))
{
- updateArraysAndPairs();
-
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ if (topology.isEmpty())
{
+ // Resetting the topology to its original condition as it was brand new
+ topologyArray = null;
+
receivedTopology = false;
}
- }
- else
- {
- topologyArray = null;
+ else
+ {
+ updateArraysAndPairs();
- receivedTopology = false;
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ // Resetting the topology to its original condition as it was brand new
+ receivedTopology = false;
+ }
+ }
}
}
- public void notifyNodeUp(final String nodeID,
+ public void notifyNodeUp(long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1293,33 +1289,33 @@
log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
}
- topology.addMember(nodeID, new TopologyMember(connectorPair), last);
+ TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
- TopologyMember actMember = topology.getMember(nodeID);
-
- if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
+ if (topology.updateMember(uniqueEventID, nodeID, member))
{
- for (ClientSessionFactory factory : factories)
+
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
{
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
- actMember.getConnector().b);
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
+ }
}
- }
- if (connectorPair.a != null)
- {
updateArraysAndPairs();
}
- synchronized (this)
+ if (last)
{
- if (last)
+ synchronized (this)
{
receivedTopology = true;
+ // Notify if waiting on getting topology
+ notifyAll();
}
-
- // Notify if waiting on getting topology
- notifyAll();
}
}
@@ -1569,9 +1565,9 @@
threadPool,
scheduledThreadPool,
interceptors);
-
+
factory.disableFinalizeCheck();
-
+
connectors.add(new Connector(initialConnector, factory));
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -31,6 +31,8 @@
{
void start(Executor executor) throws Exception;
+ Executor getExecutor();
+
void factoryClosed(final ClientSessionFactory factory);
/** Used to better identify Cluster Connection Locators on logs while debugging logs */
@@ -42,11 +44,16 @@
void cleanup();
- ClientSessionFactory connect() throws Exception;
+ ClientSessionFactoryInternal connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void notifyNodeUp(long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
- void notifyNodeDown(String nodeID);
+ /**
+ *
+ * @param uniqueEventID 0 means get the previous ID +1
+ * @param nodeID
+ */
+ void notifyNodeDown(long uniqueEventID, String nodeID);
void setClusterConnection(boolean clusterConnection);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -64,6 +64,8 @@
* values are a pair of live/backup transport configurations
*/
private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
+
+ private final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
public Topology(final Object owner)
{
@@ -104,22 +106,83 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember memberInput, final boolean last)
+ /** This is called by the server when the node is activated from backup state. It will always succeed */
+ public void updateAsLive(final String nodeId, final TopologyMember memberInput)
{
synchronized (this)
{
+ if (log.isDebugEnabled())
+ {
+ log.info(this + "::Live node " + nodeId + "=" + memberInput);
+ }
+ memberInput.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, memberInput);
+ sendMemberUp(memberInput.getUniqueEventID(), nodeId, memberInput);
+ }
+ }
+
+ /** This is called by the server when the node is activated from backup state. It will always succeed */
+ public TopologyMember updateBackup(final String nodeId, final TopologyMember memberInput)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
+ }
+
+ synchronized (this)
+ {
+ // TODO treat versioning here. it should remove any previous version
+ // However, if the previous version has a higher time (say if the node time where the system died), we should
+ // use that number ++
+
+ TopologyMember currentMember = getMember(nodeId);
+ if (currentMember == null)
+ {
+ log.warn("There's no live to be updated on backup update", new Exception("trace"));
+ }
+
+ TopologyMember newMember = new TopologyMember(currentMember.getConnector().a, memberInput.getConnector().b);
+ newMember.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(newMember.getUniqueEventID(), nodeId, newMember);
+
+ return newMember;
+ }
+ }
+
+ /**
+ *
+ * @param <p>uniqueIdentifier an unique identifier for when the change was made
+ * We will use current time millis for starts, and a ++ of that number for shutdown. </p>
+ * @param nodeId
+ * @param memberInput
+ * @return
+ */
+ public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
+ {
+ Long deleteTme = mapDelete.get(nodeId);
+ if (deleteTme != null && uniqueEventID < deleteTme)
+ {
+ return false;
+ }
+
+ if (log.isTraceEnabled())
+ {
+ // log.trace(this + "::UpdateMember::" + uniqueEventID + ", nodeID=" + nodeId + ", memberInput=" + memberInput);
+ }
+
+ synchronized (this)
+ {
TopologyMember currentMember = mapTopology.get(nodeId);
if (currentMember == null)
{
- if (!testBackof(nodeId))
- {
- return false;
- }
-
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this + "::NewMemeberAdd " + this +
+ Topology.log.debug(this + "::NewMemeberAdd " +
+ this +
" MEMBER WAS NULL, Add member nodeId=" +
nodeId +
" member = " +
@@ -127,70 +190,42 @@
" size = " +
mapTopology.size(), new Exception("trace"));
}
+ memberInput.setUniqueEventID(uniqueEventID);
mapTopology.put(nodeId, memberInput);
- sendMemberUp(nodeId, memberInput);
+ sendMemberUp(uniqueEventID, nodeId, memberInput);
return true;
}
else
{
- if (log.isTraceEnabled())
+ if (uniqueEventID > currentMember.getUniqueEventID())
{
- log.trace(this + ":: validating update for currentMember=" + currentMember + " of memberInput=" + memberInput);
- }
+ log.info(this + "::updated currentMember=nodeID=" + nodeId +
+ currentMember +
+ " of memberInput=" +
+ memberInput, new Exception ("trace"));
- boolean replaced = false;
- TopologyMember memberToSend = currentMember;
+ TopologyMember newMember = new TopologyMember(memberInput.getConnector().a, memberInput.getConnector().b);
+ newMember.setUniqueEventID(uniqueEventID);
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(uniqueEventID, nodeId, newMember);
- if (hasChanged("a", memberToSend.getConnector().a, memberInput.getConnector().a))
- {
- if (!replaced && !testBackof(nodeId))
- {
- return false;
- }
- memberToSend = new TopologyMember(memberInput.getConnector().a, memberToSend.getConnector().b);
- replaced = true;
+ return true;
}
-
- if (hasChanged("b", memberToSend.getConnector().b, memberInput.getConnector().b))
+ else
{
- if (!replaced && !testBackof(nodeId))
- {
- return false;
- }
- memberToSend = new TopologyMember(memberToSend.getConnector().a, memberInput.getConnector().b);
- replaced = true;
+ return false;
}
-
- if (replaced)
- {
- mapTopology.remove(nodeId);
- mapTopology.put(nodeId, memberToSend);
-
- sendMemberUp(nodeId, memberToSend);
- return true;
- }
-
}
}
-
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug(Topology.this + " Add member nodeId=" +
- nodeId +
- " member = " +
- memberInput +
- " has been ignored since there was no change", new Exception("trace"));
- }
-
- return false;
}
/**
* @param nodeId
* @param memberToSend
*/
- private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+ private void sendMemberUp(final long uniqueEventID, final String nodeId, final TopologyMember memberToSend)
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -207,12 +242,17 @@
{
if (Topology.log.isTraceEnabled())
{
- Topology.log.trace(Topology.this + " informing " + listener + " about node up = " + nodeId);
+ Topology.log.trace(Topology.this + " informing " +
+ listener +
+ " about node up = " +
+ nodeId +
+ " connector = " +
+ memberToSend.getConnector());
}
try
{
- listener.nodeUP(nodeId, memberToSend.getConnector(), false);
+ listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(), false);
}
catch (Throwable e)
{
@@ -276,28 +316,26 @@
return listenersCopy;
}
- public boolean removeMember(final String nodeId)
+ public boolean removeMember(final long uniqueEventID, final String nodeId)
{
TopologyMember member;
synchronized (this)
{
- Pair<Long, Integer> value = mapBackof.get(nodeId);
-
- if (value == null)
+ member = mapTopology.get(nodeId);
+ if (member != null)
{
- value = new Pair<Long, Integer>(0l, 0);
- mapBackof.put(nodeId, value);
+ if (member.getUniqueEventID() > uniqueEventID)
+ {
+ log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+ member = null;
+ }
+ else
+ {
+ mapDelete.put(nodeId, uniqueEventID);
+ member = mapTopology.remove(nodeId);
+ }
}
-
- value.a = System.currentTimeMillis();
-
- if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
- {
- value.b = 0;
- }
-
- member = mapTopology.remove(nodeId);
}
if (Topology.log.isDebugEnabled())
@@ -327,7 +365,7 @@
}
try
{
- listener.nodeDown(nodeId);
+ listener.nodeDown(uniqueEventID, nodeId);
}
catch (Exception e)
{
@@ -354,14 +392,13 @@
}
/**
- * it will send all the member updates to listeners, independently of being changed or not
+ * it will send the member to its listeners
* @param nodeID
* @param member
*/
- public void sendMemberToListeners(final String nodeID, final TopologyMember member)
+ public void sendMember(final String nodeID)
{
- // To make sure it was updated
- addMember(nodeID, member, false);
+ final TopologyMember member = getMember(nodeID);
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -380,7 +417,7 @@
" with connector=" +
member.getConnector());
}
- listener.nodeUP(nodeID, member.getConnector(), false);
+ listener.nodeUP(member.getUniqueEventID(), nodeID, member.getConnector(), false);
}
}
});
@@ -417,18 +454,21 @@
" to " +
listener);
}
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
+ listener.nodeUP(entry.getValue().getUniqueEventID(),
+ entry.getKey(),
+ entry.getValue().getConnector(),
+ ++count == copy.size());
}
}
});
}
- public TopologyMember getMember(final String nodeID)
+ public synchronized TopologyMember getMember(final String nodeID)
{
return mapTopology.get(nodeID);
}
- public boolean isEmpty()
+ public synchronized boolean isEmpty()
{
return mapTopology.isEmpty();
}
@@ -506,8 +546,10 @@
if (log.isTraceEnabled())
{
- log.trace(this + "::Validating current=" + a
- + " != input=" + b +
+ log.trace(this + "::Validating current=" +
+ a +
+ " != input=" +
+ b +
(changed ? " and it has changed" : " and it didn't change") +
", for validation of " +
debugInfo);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -27,9 +27,13 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
+ /** transient to avoid serialization changes */
+ private transient long uniqueEventID = System.currentTimeMillis();
+
public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
{
this.connector = connector;
+ this.uniqueEventID = System.currentTimeMillis();
}
public TopologyMember(TransportConfiguration a, TransportConfiguration b)
@@ -37,11 +41,28 @@
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ /**
+ * @param uniqueEventID the uniqueEventID to set
+ */
+ public void setUniqueEventID(long uniqueEventID)
+ {
+ this.uniqueEventID = uniqueEventID;
+ }
+
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
}
-
+
@Override
public String toString()
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -36,6 +36,9 @@
* @return the id
*/
long getID();
+
+ /** For protocol check */
+ boolean supports(byte packetID);
/**
* sends a packet on this channel.
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -25,6 +25,15 @@
*/
public interface CoreRemotingConnection extends RemotingConnection
{
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ int getClientVersion();
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ void setClientVersion(int clientVersion);
+
/**
* return the channel with the channel id specified.
* <p/>
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -94,6 +94,19 @@
resendCache = null;
}
}
+
+ public boolean supports(final byte packetType)
+ {
+ int version = connection.getClientVersion();
+
+ switch (packetType)
+ {
+ case PacketImpl.CLUSTER_TOPOLOGY_V2:
+ return version >= 122;
+ default:
+ return true;
+ }
+ }
public long getID()
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -32,9 +32,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -112,13 +114,19 @@
// Just send a ping back
channel0.send(packet);
}
- else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
+ else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY || packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
{
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
+ if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
+ {
+ channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2)msg).getClientVersion());
+ }
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -129,12 +137,19 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID, connectorPair, last));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ }
}
});
}
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long uniqueEventID, final String nodeID)
{
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
@@ -143,7 +158,14 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
}
});
}
@@ -177,13 +199,14 @@
}
else
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), msg.getBackupConnector());
}
if (isTrace)
{
log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
+ log.info(server + "::YYY Topology Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
+ server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -163,6 +163,17 @@
"Server will not accept create session requests");
}
+ if (connection.getClientVersion() == 0)
+ {
+ connection.setClientVersion(request.getVersion());
+ }
+ else if (connection.getClientVersion() != request.getVersion())
+ {
+ log.warn("Client is not being consistent on the request versioning. " +
+ "It just sent a version id=" + request.getVersion() +
+ " while it informed " + connection.getClientVersion() + " previously");
+ }
+
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
ServerSession session = server.createSession(request.getName(),
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
@@ -83,11 +84,13 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -152,6 +155,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
/**
* A PacketDecoder
@@ -504,6 +508,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case CLUSTER_TOPOLOGY_V2:
+ {
+ packet = new ClusterTopologyChangeMessage_V2();
+ break;
+ }
case NODE_ANNOUNCE:
{
packet = new NodeAnnounceMessage();
@@ -514,6 +523,11 @@
packet = new SubscribeClusterTopologyUpdatesMessage();
break;
}
+ case SUBSCRIBE_TOPOLOGY_V2:
+ {
+ packet = new SubscribeClusterTopologyUpdatesMessageV2();
+ break;
+ }
case SESS_ADD_METADATA:
{
packet = new SessionAddMetaDataMessage();
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -193,7 +193,13 @@
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
+ // For newer versions
+ public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
+
+ public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -72,6 +72,8 @@
private volatile boolean destroyed;
private final boolean client;
+
+ private int clientVersion;
// Channels 0-9 are reserved for the system
// 0 is for pinging
@@ -183,7 +185,23 @@
failureListeners.addAll(listeners);
}
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+ /**
+ * @param clientVersion the clientVersion to set
+ */
+ public void setClientVersion(int clientVersion)
+ {
+ this.clientVersion = clientVersion;
+ }
+
public Object getID()
{
return transportConnection.getID();
Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Clebert Suconic
+ *
+ */
+public class ClusterTopologyChangeMessage_V2 extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean exit;
+
+ private String nodeID;
+
+ private Pair<TransportConfiguration, TransportConfiguration> pair;
+
+ private long uniqueEventID;
+
+ private boolean last;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.nodeID = nodeID;
+
+ this.pair = pair;
+
+ this.last = last;
+
+ this.exit = false;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.exit = true;
+
+ this.nodeID = nodeID;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2()
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getPair()
+ {
+ return pair;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ public boolean isExit()
+ {
+ return exit;
+ }
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeBoolean(exit);
+ buffer.writeString(nodeID);
+ buffer.writeLong(uniqueEventID);
+ if (!exit)
+ {
+ if (pair.a != null)
+ {
+ buffer.writeBoolean(true);
+ pair.a.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (pair.b != null)
+ {
+ buffer.writeBoolean(true);
+ pair.b.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(last);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ exit = buffer.readBoolean();
+ nodeID = buffer.readString();
+ uniqueEventID = buffer.readLong();
+ if (!exit)
+ {
+ boolean hasLive = buffer.readBoolean();
+ TransportConfiguration a;
+ if(hasLive)
+ {
+ a = new TransportConfiguration();
+ a.decode(buffer);
+ }
+ else
+ {
+ a = null;
+ }
+ boolean hasBackup = buffer.readBoolean();
+ TransportConfiguration b;
+ if (hasBackup)
+ {
+ b = new TransportConfiguration();
+ b.decode(buffer);
+ }
+ else
+ {
+ b = null;
+ }
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+ last = buffer.readBoolean();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -34,21 +34,29 @@
private boolean backup;
+ private long currentEventID;
+
private TransportConfiguration connector;
+ private TransportConfiguration backupConnector;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ public NodeAnnounceMessage(final long currentEventID, final String nodeID, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
{
super(PacketImpl.NODE_ANNOUNCE);
+ this.currentEventID = currentEventID;
+
this.nodeID = nodeID;
this.backup = backup;
this.connector = tc;
+
+ this.backupConnector = backupConnector;
}
public NodeAnnounceMessage()
@@ -74,13 +82,43 @@
return connector;
}
+ public TransportConfiguration getBackupConnector()
+ {
+ return backupConnector;
+ }
+ /**
+ * @return the currentEventID
+ */
+ public long getCurrentEventID()
+ {
+ return currentEventID;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
buffer.writeBoolean(backup);
- connector.encode(buffer);
+ buffer.writeLong(currentEventID);
+ if (connector != null)
+ {
+ buffer.writeBoolean(true);
+ connector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (backupConnector != null)
+ {
+ buffer.writeBoolean(true);
+ backupConnector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
}
@Override
@@ -88,8 +126,17 @@
{
this.nodeID = buffer.readString();
this.backup = buffer.readBoolean();
- connector = new TransportConfiguration();
- connector.decode(buffer);
+ this.currentEventID = buffer.readLong();
+ if (buffer.readBoolean())
+ {
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+ if (buffer.readBoolean())
+ {
+ backupConnector = new TransportConfiguration();
+ backupConnector.decode(buffer);
+ }
}
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -42,11 +42,23 @@
this.clusterConnection = clusterConnection;
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection)
+ {
+ super(packetType);
+
+ this.clusterConnection = clusterConnection;
+ }
+
public SubscribeClusterTopologyUpdatesMessage()
{
super(PacketImpl.SUBSCRIBE_TOPOLOGY);
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
+ {
+ super(packetType);
+ }
+
// Public --------------------------------------------------------
public boolean isClusterConnection()
Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int clientVersion;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion)
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
+
+ this.clientVersion = clientVersion;
+ }
+
+ public SubscribeClusterTopologyUpdatesMessageV2()
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ buffer.writeInt(clientVersion);
+ }
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ clientVersion = buffer.readInt();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -48,10 +48,8 @@
void activate();
- void notifyNodeDown(String nodeID);
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
-
Topology getTopology();
void flushExecutor();
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -101,7 +101,7 @@
private final Transformer transformer;
- private volatile ClientSessionFactory csf;
+ private volatile ClientSessionFactoryInternal csf;
protected volatile ClientSessionInternal session;
@@ -200,6 +200,7 @@
{
this.notificationService = notificationService;
}
+
public synchronized void start() throws Exception
{
if (started)
@@ -647,6 +648,15 @@
}
}
+ /**
+ * @return
+ */
+ protected ClientSessionFactoryInternal getCurrentFactory()
+ {
+ return csf;
+ }
+
+
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -71,6 +72,8 @@
private final SimpleString idsHeaderName;
private final String targetNodeID;
+
+ private final long targetNodeEventUID;
private final TransportConfiguration connector;
@@ -85,6 +88,7 @@
final double retryMultiplier,
final long maxRetryInterval,
final UUID nodeUUID,
+ final long targetNodeEventUID,
final String targetNodeID,
final SimpleString name,
final Queue queue,
@@ -122,6 +126,7 @@
activated,
storageManager);
+
this.discoveryLocator = discoveryLocator;
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
@@ -130,6 +135,7 @@
this.clusterManager = clusterManager;
+ this.targetNodeEventUID = targetNodeEventUID;
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
@@ -149,6 +155,17 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
ClientSessionFactoryInternal factory = super.createSessionFactory();
+
+ try
+ {
+ TopologyMember member = clusterManager.getLocalMember();
+ factory.sendNodeAnnounce(member.getUniqueEventID(), clusterManager.getNodeId(), false, member.getConnector().a, member.getConnector().b);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
factory.getConnection().addFailureListener(new FailureListener()
{
@@ -161,6 +178,7 @@
}
}
});
+
return factory;
}
@@ -294,6 +312,7 @@
protected void afterConnect() throws Exception
{
super.afterConnect();
+ ClientSessionFactoryInternal csf = getCurrentFactory();
setupNotificationConsumer();
}
@@ -319,7 +338,7 @@
if (permanently)
{
log.debug("cluster node for bridge " + this.getName() + " is permanently down");
- discoveryLocator.notifyNodeDown(targetNodeID);
+ discoveryLocator.notifyNodeDown(targetNodeEventUID+1, targetNodeID);
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -36,6 +36,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
@@ -470,6 +471,14 @@
{
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
}
+
+ final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+
+ if (currentMember == null)
+ {
+ // sanity check only
+ throw new IllegalStateException ("InternalError! The ClusterConnection doesn't know about its own node = " + this);
+ }
serverLocator.setNodeID(nodeUUID.toString());
serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
@@ -494,6 +503,26 @@
serverLocator.addClusterTopologyListener(this);
serverLocator.start(server.getExecutorFactory().getExecutor());
+
+ serverLocator.getExecutor().execute(new Runnable(){
+ public void run()
+ {
+ try
+ {
+ ClientSessionFactoryInternal csf = serverLocator.connect();
+
+ log.info(this + "::YYY " + nodeUUID.toString() + " Cluster connection " + ClusterConnectionImpl.this +
+ " connected, sending announce node, connector=" +
+ manager.getLocalMember().getConnector().a + "/" + manager.getLocalMember().getConnector().b);
+
+ csf.sendNodeAnnounce(currentMember.getUniqueEventID(), nodeUUID.toString(), false, manager.getLocalMember().getConnector().a, manager.getLocalMember().getConnector().b);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on connectin Cluster connection to other nodes", e);
+ }
+ }
+ });
}
if (managementService != null)
@@ -515,7 +544,7 @@
// ClusterTopologyListener implementation ------------------------------------------------------------------
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long eventUID, final String nodeID)
{
if (log.isDebugEnabled())
{
@@ -544,12 +573,11 @@
{
log.error("Failed to close flow record", e);
}
-
- server.getClusterManager().notifyNodeDown(nodeID);
}
}
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long eventUID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -562,19 +590,13 @@
if (nodeID.equals(nodeUUID.toString()))
{
- if (connectorPair.b != null)
- {
if (log.isTraceEnabled())
{
log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
}
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
- }
return;
}
- // we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
@@ -615,6 +637,7 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
}
+ log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -635,7 +658,7 @@
queue = server.createQueue(queueName, queueName, null, true, false);
}
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+ createNewRecord(eventUID, nodeID, connectorPair.a, queueName, queue, true);
}
else
{
@@ -656,7 +679,8 @@
}
}
- private void createNewRecord(final String targetNodeID,
+ private void createNewRecord(final long eventUID,
+ final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
@@ -690,7 +714,7 @@
targetLocator.disableFinalizeCheck();
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
@@ -701,6 +725,7 @@
retryIntervalMultiplier,
maxRetryInterval,
nodeUUID,
+ record.getEventUID(),
record.getTargetNodeID(),
record.getQueueName(),
record.getQueue(),
@@ -742,6 +767,8 @@
{
private BridgeImpl bridge;
+ private final long eventUID;
+
private final String targetNodeID;
private final TransportConfiguration connector;
@@ -761,6 +788,7 @@
private volatile boolean firstReset = false;
public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+ final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
@@ -771,6 +799,7 @@
this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
+ this.eventUID = eventUID;
}
/* (non-Javadoc)
@@ -802,6 +831,14 @@
{
return address.toString();
}
+
+ /**
+ * @return the eventUID
+ */
+ public long getEventUID()
+ {
+ return eventUID;
+ }
/**
* @return the nodeID
@@ -1034,6 +1071,7 @@
private synchronized void doBindingAdded(final ClientMessage message) throws Exception
{
+ log.info(ClusterConnectionImpl.this + " Adding binding " + message);
if (log.isTraceEnabled())
{
log.trace(ClusterConnectionImpl.this + " Adding binding " + message);
@@ -1091,7 +1129,7 @@
// hops is too high
// or there are multiple cluster connections for the same address
- ClusterConnectionImpl.log.warn("Remote queue binding " + clusterName +
+ ClusterConnectionImpl.log.warn(this + "::Remote queue binding " + clusterName +
" has already been bound in the post office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -100,6 +100,8 @@
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
private final Topology topology = new Topology(this);
+
+ private TopologyMember localMember;
private volatile ServerLocatorInternal backupServerLocator;
@@ -168,6 +170,16 @@
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}
+
+ public TopologyMember getLocalMember()
+ {
+ return localMember;
+ }
+
+ public String getNodeId()
+ {
+ return nodeUUID.toString();
+ }
public synchronized void start() throws Exception
{
@@ -183,11 +195,41 @@
deployBroadcastGroup(config);
}
+ String connectorName = null;
+
for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
{
- deployClusterConnection(config);
+ if (connectorName == null)
+ {
+ connectorName = config.getConnectorName();
+ }
+ else if (!connectorName.equals(config.getConnectorName()))
+ {
+ throw new IllegalStateException("Using multiple connector names on cluster connections it's "
+ + "not supported at this time");
+ }
+
}
+ if (connectorName != null)
+ {
+ TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
+ if (nodeConnector == null)
+ {
+ log.warn("No connecor with name '" + connectorName +
+ "'. The cluster connection will not be deployed.");
+ return;
+ }
+
+ // Now announce presence
+ announceNode(nodeConnector);
+
+ for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
+ {
+ deployClusterConnection(config);
+ }
+ }
+
}
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -195,13 +237,6 @@
deployBridge(config);
}
- // Now announce presence
-
- if (clusterConnections.size() > 0)
- {
- announceNode();
- }
-
started = true;
}
@@ -264,81 +299,26 @@
clusterConnections.clear();
}
- public void notifyNodeDown(String nodeID)
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean backup)
{
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
- log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));
-
- topology.removeMember(nodeID);
-
- }
-
- public void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final boolean nodeAnnounce)
- {
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeUp " + nodeID + connectorPair + ", nodeAnnounce=" + nodeAnnounce);
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
}
- TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member, last);
-
- if (!updated)
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
{
- if (log.isDebugEnabled())
- {
- log.debug(this + " ignored notifyNodeUp on nodeID=" +
- nodeID +
- " pair=" +
- connectorPair +
- " as the topology already knew about it");
- }
- return;
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
}
-
- if (log.isDebugEnabled())
+ else
{
- log.debug(this + " received notifyNodeUp nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- ", nodeAnnounce=" +
- nodeAnnounce +
- ", last=" +
- last);
+ topology.updateMember(uniqueEventID, nodeID, newMember);
}
-
- // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
- // connections.
- if (nodeAnnounce)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Informing " + nodeID + " to " + clusterConnections.toString());
- }
- for (ClusterConnection clusterConnection : clusterConnections.values())
- {
- if (log.isTraceEnabled())
- {
- log.trace(this + " information clusterConnection=" +
- clusterConnection +
- " nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- " last=" +
- last);
- }
- clusterConnection.nodeUP(nodeID, connectorPair, last);
- }
- }
}
public void flushExecutor()
@@ -347,7 +327,8 @@
executor.execute(future);
if (!future.await(10000))
{
- server.threadDump("Couldn't flush ClusterManager executor (" + this + ") in 10 seconds, verify your thread pool size");
+ server.threadDump("Couldn't flush ClusterManager executor (" + this +
+ ") in 10 seconds, verify your thread pool size");
}
}
@@ -405,9 +386,8 @@
TopologyMember member = topology.getMember(nodeID);
// swap backup as live and send it to everybody
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
- null));
- topology.addMember(nodeID, member, false);
+ member = new TopologyMember(member.getConnector().b, null);
+ topology.updateAsLive(nodeID, member);
if (backupServerLocator != null)
{
@@ -460,7 +440,7 @@
}
}
- topology.sendMemberToListeners(nodeID, member);
+ topology.sendMember(nodeID);
}
}
@@ -496,43 +476,21 @@
this.clusterLocators.remove(serverLocator);
}
- private synchronized void announceNode()
+ private synchronized void announceNode(final TransportConfiguration nodeConnector)
{
- // TODO does this really work with more than one cluster connection? I think not
-
- // Just take the first one for now
- ClusterConnection cc = clusterConnections.values().iterator().next();
-
String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
-
- if (member == null)
+
+ if (backup)
{
- if (backup)
- {
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null,
- cc.getConnector()));
- }
- else
- {
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(),
- null));
- }
-
- topology.addMember(nodeID, member, false);
+ localMember = new TopologyMember(null, nodeConnector);
}
else
{
- if (backup)
- {
- // pair.b = cc.getConnector();
- }
- else
- {
- // pair.a = cc.getConnector();
- }
+ localMember = new TopologyMember(nodeConnector, null);
}
+
+ System.out.println("Adding local member " + localMember);
+ topology.updateAsLive(nodeID, localMember);
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
@@ -955,9 +913,14 @@
ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
if (backupSessionFactory != null)
{
+ System.out.println("announcing " + System.currentTimeMillis());
backupSessionFactory.getConnection()
.getChannel(0, -1)
- .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
log.info("backup announced");
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -14,6 +14,7 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -28,5 +29,9 @@
void addClusterLocator(ServerLocatorInternal locator);
void removeClusterLocator(ServerLocatorInternal locator);
+
+ TopologyMember getLocalMember();
+
+ String getNodeId();
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -354,11 +354,10 @@
if(nodeManager.isBackupLive())
{
- Thread.sleep(configuration.getFailbackDelay());
//looks like we've failed over at some point need to inform that we are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
- Thread.sleep(configuration.getFailbackDelay());
+ //Thread.sleep(configuration.getFailbackDelay());
}
nodeManager.startLiveNode();
@@ -2004,7 +2003,7 @@
public String toString()
{
- return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
+ return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", "))/* + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "")*/;
}
// Inner classes
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -83,12 +83,12 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession , FailureListener
+public class ServerSessionImpl implements ServerSession, FailureListener
{
// Constants -----------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Static -------------------------------------------------------------------------------
@@ -147,14 +147,14 @@
private volatile SimpleString defaultAddress;
private volatile int timeoutSeconds;
-
+
private Map<String, String> metaData;
-
+
private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
- private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
-
+ private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+
private long creationTime = System.currentTimeMillis();
// Constructors ---------------------------------------------------------------------------------
@@ -244,7 +244,6 @@
this.sessionContext = sessionContext;
}
-
public String getUsername()
{
return username;
@@ -269,8 +268,9 @@
{
return remotingConnection.getID();
}
-
- public Set<ServerConsumer> getServerConsumers() {
+
+ public Set<ServerConsumer> getServerConsumers()
+ {
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
return Collections.unmodifiableSet(consumersClone);
}
@@ -316,7 +316,7 @@
}
remotingConnection.removeFailureListener(this);
-
+
callback.closed();
}
@@ -394,6 +394,8 @@
}
Queue queue = server.createQueue(address, name, filterString, durable, temporary);
+
+ System.out.println("Created queue " + queue + " / address=" + address + " on " + server);
if (temporary)
{
@@ -402,7 +404,7 @@
// dies. It does not mean it will get deleted automatically when the
// session is closed.
// It is up to the user to delete the queue when finished with it
-
+
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name, queue);
remotingConnection.addCloseListener(cleaner);
@@ -411,8 +413,7 @@
tempQueueCleannerUppers.put(name, cleaner);
}
}
-
-
+
/**
* For test cases only
* @return
@@ -427,7 +428,7 @@
private final PostOffice postOffice;
private final SimpleString bindingName;
-
+
private final Queue queue;
TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName, final Queue queue)
@@ -435,7 +436,7 @@
this.postOffice = postOffice;
this.bindingName = bindingName;
-
+
this.queue = queue;
}
@@ -443,15 +444,15 @@
{
try
{
- if (log.isDebugEnabled())
- {
- log.debug("deleting temporary queue " + bindingName);
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("deleting temporary queue " + bindingName);
+ }
if (postOffice.getBinding(bindingName) != null)
{
postOffice.removeBinding(bindingName);
}
-
+
queue.deleteAllReferences();
}
catch (Exception e)
@@ -469,7 +470,7 @@
{
run();
}
-
+
public String toString()
{
return "Temporary Cleaner for queue " + bindingName;
@@ -489,11 +490,11 @@
server.destroyQueue(name, this);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
-
+
if (cleaner != null)
{
remotingConnection.removeCloseListener(cleaner);
-
+
remotingConnection.removeFailureListener(cleaner);
}
}
@@ -576,8 +577,8 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
- consumer.acknowledge(autoCommitAcks, tx, messageID);
+
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
}
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
@@ -935,7 +936,7 @@
throw new HornetQXAException(XAException.XAER_PROTO,
"Cannot prepare transaction, it is suspended " + xid);
}
- else if(theTx.getState() == Transaction.State.PREPARED)
+ else if (theTx.getState() == Transaction.State.PREPARED)
{
log.info("ignoring prepare on xid as already called :" + xid);
}
@@ -966,7 +967,7 @@
public void xaSetTimeout(final int timeout)
{
timeoutSeconds = timeout;
- if(tx != null)
+ if (tx != null)
{
tx.setTimeout(timeout);
}
@@ -981,18 +982,18 @@
{
setStarted(false);
}
-
+
public void waitContextCompletion()
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
try
{
if (!storageManager.waitOnOperations(10000))
{
- log.warn("Couldn't finish context execution in 10 seconds", new Exception ("warning"));
+ log.warn("Couldn't finish context execution in 10 seconds", new Exception("warning"));
}
}
catch (Exception e)
@@ -1009,7 +1010,7 @@
public void close(final boolean failed)
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
storageManager.setContext(sessionContext);
@@ -1019,7 +1020,7 @@
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
try
@@ -1071,9 +1072,9 @@
{
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
-
+
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
@@ -1085,7 +1086,7 @@
public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
-
+
SimpleString address = message.getAddress();
message.setMessageID(id);
@@ -1111,7 +1112,6 @@
log.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
-
if (message.getAddress().equals(managementAddress))
{
// It's a management message
@@ -1129,7 +1129,10 @@
}
}
- public void sendContinuations(final int packetSize, final long messageBodySize, final byte[] body, final boolean continues) throws Exception
+ public void sendContinuations(final int packetSize,
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -1144,7 +1147,7 @@
if (!continues)
{
currentLargeMessage.releaseResources();
-
+
if (messageBodySize >= 0)
{
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
@@ -1178,7 +1181,6 @@
consumer.setTransferring(transferring);
}
}
-
public void addMetaData(String key, String data)
{
@@ -1198,7 +1200,7 @@
}
return data;
}
-
+
public String[] getTargetAddresses()
{
Map<SimpleString, Pair<UUID, AtomicLong>> copy = cloneTargetAddresses();
@@ -1238,7 +1240,7 @@
public void describeProducersInfo(JSONArray array) throws Exception
{
Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = cloneTargetAddresses();
-
+
for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet())
{
JSONObject producerInfo = new JSONObject();
@@ -1251,7 +1253,6 @@
}
}
-
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1271,7 +1272,6 @@
}
}
-
// Public
// ----------------------------------------------------------------------------
@@ -1337,7 +1337,7 @@
toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
-
+
for (MessageReference ref : toCancel)
{
ref.getQueue().cancel(theTx, ref);
@@ -1379,12 +1379,12 @@
}
postOffice.route(msg, routingContext, direct);
-
+
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-
+
if (value == null)
{
- targetAddressInfos.put(msg.getAddress(), new Pair<UUID,AtomicLong>(msg.getUserID(), new AtomicLong(1)));
+ targetAddressInfos.put(msg.getAddress(), new Pair<UUID, AtomicLong>(msg.getUserID(), new AtomicLong(1)));
}
else
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -444,6 +444,7 @@
// Now we will simulate a failure of the bridge connection between server0 and server1
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+ assertNotNull(bridge);
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = reconnectAttempts - 1;
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -88,7 +88,7 @@
private static final long WAIT_TIMEOUT = 10000;
- private static final long TIMEOUT_START_SERVER = 500;
+ private static final long TIMEOUT_START_SERVER = 10;
@Override
protected void setUp() throws Exception
@@ -385,8 +385,8 @@
{
if (hornetQServer != null)
{
- out.println(clusterDescription(hornetQServer));
- out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+ System.out.println(clusterDescription(hornetQServer));
+ System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
.getManagementNotificationAddress()
.toString()));
}
@@ -2023,16 +2023,20 @@
for (int node : nodes)
{
log.info("#test start node " + node);
-// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-// {
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
- Thread.sleep(TIMEOUT_START_SERVER);
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+ {
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
timeStarts[node] = System.currentTimeMillis();
servers[node].setIdentity("server " + node);
ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
+
+// for (int i = 0 ; i <= node; i++)
+// {
+// System.out.println(servers[node].getClusterManager().getTopology().describe());
+// }
ClusterTestBase.log.info("started server " + servers[node]);
@@ -2068,13 +2072,13 @@
{
try
{
-// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-// {
-// // We can't stop and start a node too fast (faster than what the Topology could realize about this
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+ {
+ // We can't stop and start a node too fast (faster than what the Topology could realize about this
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
- Thread.sleep(TIMEOUT_START_SERVER);
+ //Thread.sleep(TIMEOUT_START_SERVER);
timeStarts[node] = System.currentTimeMillis();
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -97,6 +97,8 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
+
+ Thread.sleep(1000);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -148,28 +148,33 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
startServers(0, 1, 2);
+
+ waitForTopology(servers[0], 3);
+ waitForTopology(servers[1], 3);
+ waitForTopology(servers[2], 3);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
+ }
- for (int i = 0; i < 10; i++)
- log.info("****************************");
for (int i = 0; i <= 2; i++)
{
log.info("*************************************\n " + servers[i] +
" topology:\n" +
servers[i].getClusterManager().getTopology().describe());
}
- for (int i = 0; i < 10; i++)
- log.info("****************************");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue0", null, false);
- createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "manequeue0", null, false);
+ createQueue(1, "queues.testaddress", "manequeue0", null, false);
+ createQueue(2, "queues.testaddress", "manequeue0", null, false);
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
+ addConsumer(0, 0, "manequeue0", null);
+ addConsumer(1, 1, "manequeue0", null);
+ addConsumer(2, 2, "manequeue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -180,19 +185,52 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
+
+ public void testSimple_TwoNodes() throws Exception
+ {
+ setupServer(0, false, isNetty());
+ setupServer(1, false, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ for (int i = 0; i <= 1; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
+ }
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "manequeue0", null, false);
+ createQueue(1, "queues.testaddress", "manequeue0", null, false);
+
+ addConsumer(0, 0, "manequeue0", null);
+ addConsumer(1, 1, "manequeue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ closeAllConsumers();
+
+ }
+
static int loopNumber;
public void _testLoop() throws Throwable
{
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 10; i++)
{
loopNumber = i;
log.info("#test " + i);
- testSimple2();
- if (i + 1 < 1000)
- {
- tearDown();
- setUp();
- }
+ testSimple();
+ tearDown();
+ setUp();
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.util.Map;
+
import org.hornetq.core.logging.Logger;
/**
@@ -136,14 +138,20 @@
Thread.currentThread().setName("ThreadOnTestRestartTest");
startServers(0, 1);
waitForTopology(servers[0], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
- for (int i = 0; i < 5; i++)
+
+ for (int i = 0; i < 10; i++)
{
+ Thread.sleep(10);
log.info("Sleep #test " + i);
log.info("#stop #test #" + i);
- Thread.sleep(500);
stopServers(1);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -433,8 +433,9 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
+ System.out.println("Received " + connectorPair + " uniqueEvent=" + uniqueEventID);
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
liveNode.add(connectorPair.a.getName());
@@ -447,7 +448,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -219,7 +219,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
@@ -233,7 +233,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -25,7 +25,6 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -91,6 +90,7 @@
ClientSession session = sendAndConsume(sf, true);
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ Thread.sleep(500);
servers.get(0).crash(session);
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -204,7 +204,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -222,7 +223,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -278,7 +279,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -289,7 +291,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -350,7 +352,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -361,7 +363,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -432,7 +434,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -443,7 +445,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-08-29 16:20:36 UTC (rev 11234)
@@ -296,7 +296,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -312,7 +312,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
// To change body of implemented methods use File | Settings | File Templates.
}
13 years, 3 months
JBoss hornetq SVN: r11233 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 12:16:58 -0400 (Mon, 29 Aug 2011)
New Revision: 11233
Removed:
branches/Branch_2_2_EAP_cluster_clean2/
Log:
removing old branch
13 years, 3 months
JBoss hornetq SVN: r11232 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 12:16:38 -0400 (Mon, 29 Aug 2011)
New Revision: 11232
Added:
branches/Branch_2_2_EAP_cluster_clean3/
Log:
Creating a new branch for clustering work
13 years, 3 months
JBoss hornetq SVN: r11231 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-26 09:21:29 -0400 (Fri, 26 Aug 2011)
New Revision: 11231
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
Log:
tweak on test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-26 06:41:31 UTC (rev 11230)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-26 13:21:29 UTC (rev 11231)
@@ -59,6 +59,7 @@
public void testMultipleFailovers2LiveServers() throws Exception
{
+ // TODO: remove these sleeps
NodeManager nodeManager1 = new InVMNodeManager();
NodeManager nodeManager2 = new InVMNodeManager();
createLiveConfig(nodeManager1, 0, 3, 4, 5);
@@ -109,23 +110,31 @@
locator2.close();
if (liveAfter0 == 2)
{
+ Thread.sleep(500);
servers.get(1).stop();
+ Thread.sleep(500);
servers.get(2).stop();
}
else
{
+ Thread.sleep(500);
servers.get(2).stop();
+ Thread.sleep(500);
servers.get(1).stop();
}
if (liveAfter3 == 4)
{
+ Thread.sleep(500);
servers.get(5).stop();
+ Thread.sleep(500);
servers.get(4).stop();
}
else
{
+ Thread.sleep(500);
servers.get(4).stop();
+ Thread.sleep(500);
servers.get(5).stop();
}
}
13 years, 4 months
JBoss hornetq SVN: r11230 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-26 02:41:31 -0400 (Fri, 26 Aug 2011)
New Revision: 11230
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
I'm having issues with versioning of Topic Members. Until I implement a better algorithm I will keep a sleep before each server is started on the testsuite
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-26 03:53:29 UTC (rev 11229)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-26 06:41:31 UTC (rev 11230)
@@ -88,7 +88,7 @@
private static final long WAIT_TIMEOUT = 10000;
- private static final long TIMEOUT_START_SERVER = 1000;
+ private static final long TIMEOUT_START_SERVER = 500;
@Override
protected void setUp() throws Exception
@@ -2023,10 +2023,11 @@
for (int node : nodes)
{
log.info("#test start node " + node);
- if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
- {
- Thread.sleep(TIMEOUT_START_SERVER);
- }
+// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+// {
+// Thread.sleep(TIMEOUT_START_SERVER);
+// }
+ Thread.sleep(TIMEOUT_START_SERVER);
timeStarts[node] = System.currentTimeMillis();
servers[node].setIdentity("server " + node);
@@ -2067,11 +2068,14 @@
{
try
{
- if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
- {
- // We can't stop and start a node too fast (faster than what the Topology could realize about this
- Thread.sleep(TIMEOUT_START_SERVER);
- }
+// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+// {
+// // We can't stop and start a node too fast (faster than what the Topology could realize about this
+// Thread.sleep(TIMEOUT_START_SERVER);
+// }
+
+ Thread.sleep(TIMEOUT_START_SERVER);
+
timeStarts[node] = System.currentTimeMillis();
ClusterTestBase.log.info("stopping server " + node);
13 years, 4 months
JBoss hornetq SVN: r11229 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-25 23:53:29 -0400 (Thu, 25 Aug 2011)
New Revision: 11229
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
Log:
re-enabling back-of until I can figure out how to avoid it on Topology
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-26 03:16:31 UTC (rev 11228)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-26 03:53:29 UTC (rev 11229)
@@ -36,8 +36,7 @@
public class Topology implements Serializable
{
- // TODO: remove the backof from this class. It's probably not needed any longer
- // private static final int BACKOF_TIMEOUT = 500;
+ private static final int BACKOF_TIMEOUT = 500;
private static final long serialVersionUID = -9037171688692471371L;
@@ -45,7 +44,7 @@
private static final Logger log = Logger.getLogger(Topology.class);
- // private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
+ private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
private Executor executor = null;
@@ -113,10 +112,10 @@
if (currentMember == null)
{
- /*if (!testBackof(nodeId))
+ if (!testBackof(nodeId))
{
return false;
- } */
+ }
if (Topology.log.isDebugEnabled())
{
@@ -144,20 +143,20 @@
if (hasChanged("a", memberToSend.getConnector().a, memberInput.getConnector().a))
{
- /*if (!replaced && !testBackof(nodeId))
+ if (!replaced && !testBackof(nodeId))
{
return false;
- }*/
+ }
memberToSend = new TopologyMember(memberInput.getConnector().a, memberToSend.getConnector().b);
replaced = true;
}
if (hasChanged("b", memberToSend.getConnector().b, memberInput.getConnector().b))
{
- /*if (!replaced && !testBackof(nodeId))
+ if (!replaced && !testBackof(nodeId))
{
return false;
- }*/
+ }
memberToSend = new TopologyMember(memberToSend.getConnector().a, memberInput.getConnector().b);
replaced = true;
}
@@ -228,7 +227,7 @@
* @param nodeId
* @param backOfData
*/
- /*private boolean testBackof(final String nodeId)
+ private boolean testBackof(final String nodeId)
{
Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
@@ -262,7 +261,7 @@
}
return true;
- } */
+ }
/**
* @return
@@ -283,21 +282,21 @@
synchronized (this)
{
-// Pair<Long, Integer> value = mapBackof.get(nodeId);
-//
-// if (value == null)
-// {
-// value = new Pair<Long, Integer>(0l, 0);
-// mapBackof.put(nodeId, value);
-// }
-//
-// value.a = System.currentTimeMillis();
-//
-// if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
-// {
-// value.b = 0;
-// }
+ Pair<Long, Integer> value = mapBackof.get(nodeId);
+ if (value == null)
+ {
+ value = new Pair<Long, Integer>(0l, 0);
+ mapBackof.put(nodeId, value);
+ }
+
+ value.a = System.currentTimeMillis();
+
+ if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
+ {
+ value.b = 0;
+ }
+
member = mapTopology.remove(nodeId);
}
13 years, 4 months
JBoss hornetq SVN: r11228 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-25 23:16:31 -0400 (Thu, 25 Aug 2011)
New Revision: 11228
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
tweak
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-26 03:08:55 UTC (rev 11227)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-26 03:16:31 UTC (rev 11228)
@@ -87,6 +87,8 @@
TransportConstants.DEFAULT_PORT + 9, };
private static final long WAIT_TIMEOUT = 10000;
+
+ private static final long TIMEOUT_START_SERVER = 1000;
@Override
protected void setUp() throws Exception
@@ -2021,9 +2023,9 @@
for (int node : nodes)
{
log.info("#test start node " + node);
- if (System.currentTimeMillis() - timeStarts[node] < 100)
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
{
- Thread.sleep(100);
+ Thread.sleep(TIMEOUT_START_SERVER);
}
timeStarts[node] = System.currentTimeMillis();
@@ -2065,10 +2067,10 @@
{
try
{
- if (System.currentTimeMillis() - timeStarts[node] < 100)
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
{
// We can't stop and start a node too fast (faster than what the Topology could realize about this
- Thread.sleep(100);
+ Thread.sleep(TIMEOUT_START_SERVER);
}
timeStarts[node] = System.currentTimeMillis();
13 years, 4 months
JBoss hornetq SVN: r11227 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-25 23:08:55 -0400 (Thu, 25 Aug 2011)
New Revision: 11227
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
tweak to a test after a small (expected) failure
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-08-26 02:41:29 UTC (rev 11226)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-08-26 03:08:55 UTC (rev 11227)
@@ -142,6 +142,7 @@
{
log.info("Sleep #test " + i);
log.info("#stop #test #" + i);
+ Thread.sleep(500);
stopServers(1);
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
13 years, 4 months