Author: clebert.suconic
Date: 2011-08-13 03:07:31 -0400 (Sat, 13 Aug 2011)
New Revision: 11202
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
testsuite fixes
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-13
06:36:26 UTC (rev 11201)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-13
07:07:31 UTC (rev 11202)
@@ -661,10 +661,8 @@
final Queue queue,
final boolean start) throws Exception
{
- Topology topology = new Topology(null);
- final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology,
- false,
- connector);
+ final Topology topology = new Topology(null);
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false,
connector);
targetLocator.setReconnectAttempts(0);
@@ -689,35 +687,53 @@
{
targetLocator.setRetryInterval(retryInterval);
}
-
+
targetLocator.disableFinalizeCheck();
-
+
targetLocator.connect();
-
+ ClusterTopologyListener listenerOnBridgeTopology = new ClusterTopologyListener()
+ {
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
targetNodeID, connector, queueName, queue);
+ public void nodeDown(String nodeID)
+ {
+ clusterManagerTopology.removeMember(nodeID);
+ }
- topology.setOwner(record);
-
- // Establish a proxy to the main topology.
- // We are going to listen for adds and removes on the bridges as well
- topology.addClusterTopologyListener(new ClusterTopologyListener(){
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
+ {
+ clusterManagerTopology.addMember(nodeID, new TopologyMember(connectorPair),
last);
+ }
+ };
+
+ ClusterTopologyListener listenerOnMainTopology = new ClusterTopologyListener()
+ {
public void nodeDown(String nodeID)
{
- clusterManagerTopology.removeMember(nodeID);
+ topology.removeMember(nodeID);
}
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last)
{
- clusterManagerTopology.addMember(nodeID,new TopologyMember(connectorPair),
last);
+ topology.addMember(nodeID, new TopologyMember(connectorPair), last);
}
-
- });
+ };
+ // Establish a proxy between each other topology
+ topology.addClusterTopologyListener(listenerOnBridgeTopology);
+
+ clusterManagerTopology.addClusterTopologyListener(listenerOnMainTopology);
+
+
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology,
listenerOnBridgeTopology, targetLocator, targetNodeID, connector, queueName, queue);
+
+ topology.setOwner(record);
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
targetLocator,
@@ -761,7 +777,7 @@
bridge.start();
}
}
-
+
// Inner classes
-----------------------------------------------------------------------------------
private class MessageFlowRecordImpl implements MessageFlowRecord
@@ -771,11 +787,11 @@
private final String targetNodeID;
private final TransportConfiguration connector;
-
+
private final ServerLocatorInternal targetLocator;
private final SimpleString queueName;
-
+
private boolean disconnected = false;;
private final Queue queue;
@@ -785,8 +801,14 @@
private volatile boolean isClosed = false;
private volatile boolean firstReset = false;
+
+ private final ClusterTopologyListener listenerOnMainTopology;
+
+ private final ClusterTopologyListener listenerOnBridgeTopology;
- public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+ public MessageFlowRecordImpl(final ClusterTopologyListener listenerOnMainTopology,
+ final ClusterTopologyListener
listenerOnBridgeTopology,
+ final ServerLocatorInternal targetLocator,
final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
@@ -797,6 +819,8 @@
this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
+ this.listenerOnMainTopology = listenerOnMainTopology;
+ this.listenerOnBridgeTopology = listenerOnBridgeTopology;
}
/* (non-Javadoc)
@@ -818,7 +842,7 @@
firstReset +
"]";
}
-
+
public void serverDisconnected()
{
this.disconnected = true;
@@ -872,17 +896,20 @@
{
log.trace("Stopping bridge " + bridge);
}
+
+ clusterManagerTopology.removeClusterTopologyListener(listenerOnMainTopology);
+
targetLocator.getTopology().removeClusterTopologyListener(listenerOnBridgeTopology);
isClosed = true;
clearBindings();
-
+
if (disconnected)
{
bridge.disconnect();
}
bridge.stop();
-
+
bridge.getExecutor().execute(new Runnable()
{
public void run()
@@ -1356,9 +1383,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for
" + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology,
- true,
- tcConfigs);
+ return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
}
else
{
@@ -1388,9 +1413,7 @@
public ServerLocatorInternal createServerLocator()
{
- return new ServerLocatorImpl(clusterManagerTopology,
- true,
- dg);
+ return new ServerLocatorImpl(clusterManagerTopology, true, dg);
}
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-13
06:36:26 UTC (rev 11201)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-13
07:07:31 UTC (rev 11202)
@@ -2031,7 +2031,7 @@
* we need to wait a little while between server start up to allow the server to
communicate in some order.
* This is to avoid split brain on startup
* */
- Thread.sleep(1000);
+ Thread.sleep(500);
}
}