Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 22:20:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11194
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Improving testsuite
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-12
00:05:39 UTC (rev 11193)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-12
02:20:12 UTC (rev 11194)
@@ -1137,9 +1137,19 @@
super.finalize();
}
+
+ public void cleanup()
+ {
+ doClose(false);
+ }
public void close()
{
+ doClose(true);
+ }
+
+ protected void doClose(final boolean sendClose)
+ {
if (closed)
{
if (log.isDebugEnabled())
@@ -1176,7 +1186,14 @@
for (ClientSessionFactory factory : clonedFactory)
{
- factory.close();
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
}
factories.clear();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-12
00:05:39 UTC (rev 11193)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-12
02:20:12 UTC (rev 11194)
@@ -39,6 +39,8 @@
void setNodeID(String nodeID);
String getNodeID();
+
+ void cleanup();
ClientSessionFactory connect() throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-12
00:05:39 UTC (rev 11193)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-12
02:20:12 UTC (rev 11194)
@@ -104,7 +104,7 @@
private volatile ServerLocatorInternal backupServerLocator;
- private final Set<ServerLocator> clusterLocators = new
ConcurrentHashSet<ServerLocator>();
+ private final Set<ServerLocatorInternal> clusterLocators = new
ConcurrentHashSet<ServerLocatorInternal>();
private final Executor executor;
@@ -143,21 +143,21 @@
this.clustered = clustered;
}
-
+
public String describe()
{
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
+
out.println("Information on " + this);
out.println("*******************************************************");
out.println("Topology: " + topology.describe("Toopology on " +
this));
-
+
for (ClusterConnection conn : this.clusterConnections.values())
{
out.println(conn.describe());
}
-
+
out.println("*******************************************************");
return str.toString();
@@ -167,7 +167,7 @@
{
return "ClusterManagerImpl[server=" + server + "]@" +
System.identityHashCode(this);
}
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -204,56 +204,60 @@
started = true;
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
- if (!started)
+ synchronized (this)
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- if (clustered)
- {
- for (BroadcastGroup group : broadcastGroups.values())
+ if (clustered)
{
- group.stop();
- managementService.unregisterBroadcastGroup(group.getName());
+ for (BroadcastGroup group : broadcastGroups.values())
+ {
+ group.stop();
+ managementService.unregisterBroadcastGroup(group.getName());
+ }
+
+ broadcastGroups.clear();
+
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ clusterConnection.stop();
+
managementService.unregisterCluster(clusterConnection.getName().toString());
+ }
+
}
- broadcastGroups.clear();
-
- for (ClusterConnection clusterConnection : clusterConnections.values())
+ for (Bridge bridge : bridges.values())
{
- clusterConnection.stop();
- managementService.unregisterCluster(clusterConnection.getName().toString());
+ bridge.stop();
+ managementService.unregisterBridge(bridge.getName().toString());
}
- }
+ bridges.clear();
- for (Bridge bridge : bridges.values())
- {
- bridge.stop();
- managementService.unregisterBridge(bridge.getName().toString());
+ if (backupServerLocator != null)
+ {
+ backupServerLocator.close();
+ backupServerLocator = null;
+ }
}
- bridges.clear();
-
- if (backupServerLocator != null)
+ for (ServerLocatorInternal clusterLocator : clusterLocators)
{
- backupServerLocator.close();
- backupServerLocator = null;
- }
-
- executor.execute(new Runnable()
- {
- public void run()
+ try
{
- for (ServerLocator clusterLocator : clusterLocators)
- {
- clusterLocator.close();
- }
- clusterLocators.clear();
+ clusterLocator.close();
}
- });
+ catch (Exception e)
+ {
+ log.warn("Error closing serverLocator=" + clusterLocator + ",
message=" + e.getMessage(), e);
+ }
+ }
+ clusterLocators.clear();
started = false;
clusterConnections.clear();
@@ -265,9 +269,9 @@
{
return;
}
-
- log.debug(this + "::removing nodeID=" + nodeID, new Exception
("trace"));
+ log.debug(this + "::removing nodeID=" + nodeID, new
Exception("trace"));
+
topology.removeMember(nodeID);
}
@@ -284,22 +288,32 @@
TopologyMember member = new TopologyMember(connectorPair);
boolean updated = topology.addMember(nodeID, member, last);
-
+
if (!updated)
{
if (log.isDebugEnabled())
{
- log.debug(this + " ignored notifyNodeUp on nodeID=" + nodeID +
" pair=" + connectorPair + " as the topology already knew about it");
+ log.debug(this + " ignored notifyNodeUp on nodeID=" +
+ nodeID +
+ " pair=" +
+ connectorPair +
+ " as the topology already knew about it");
}
return;
}
if (log.isDebugEnabled())
{
- log.debug(this + " received notifyNodeUp nodeID=" + nodeID + "
connectorPair=" + connectorPair +
- ", nodeAnnounce=" + nodeAnnounce + ", last=" +
last);
+ log.debug(this + " received notifyNodeUp nodeID=" +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ ", nodeAnnounce=" +
+ nodeAnnounce +
+ ", last=" +
+ last);
}
-
+
// if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster
// connections.
if (nodeAnnounce)
@@ -312,8 +326,14 @@
{
if (log.isTraceEnabled())
{
- log.trace(this + " information clusterConnection=" +
clusterConnection +
- " nodeID=" + nodeID + " connectorPair=" +
connectorPair + " last=" + last);
+ log.trace(this + " information clusterConnection=" +
+ clusterConnection +
+ " nodeID=" +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ " last=" +
+ last);
}
clusterConnection.nodeUP(nodeID, connectorPair, last);
}
@@ -350,17 +370,17 @@
topology.addClusterTopologyListener(listener);
// We now need to send the current topology to the client
- executor.execute(new Runnable(){
+ executor.execute(new Runnable()
+ {
public void run()
{
topology.sendTopology(listener);
-
+
}
});
}
- public void removeClusterTopologyListener(final ClusterTopologyListener listener,
- final boolean
clusterConnection)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
topology.removeClusterTopologyListener(listener);
}
@@ -380,8 +400,9 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- //swap backup as live and send it to everybody
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(member.getConnector().b, null));
+ // swap backup as live and send it to everybody
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(member.getConnector().b,
+
null));
topology.addMember(nodeID, member, false);
if (backupServerLocator != null)
@@ -434,7 +455,7 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
+
topology.sendMemberToListeners(nodeID, member);
}
}
@@ -460,7 +481,7 @@
log.warn("no cluster connections defined, unable to announce
backup");
}
}
-
+
void addClusterLocator(final ServerLocatorInternal serverLocator)
{
this.clusterLocators.add(serverLocator);
@@ -681,7 +702,7 @@
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
-
+
// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
serverLocator.setInitialConnectAttempts(-1);
@@ -693,12 +714,12 @@
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
if (!config.isUseDuplicateDetection())
{
- log.debug("Bridge " + config.getName() +
+ log.debug("Bridge " + config.getName() +
" is configured to not use duplicate detecion, it will send
messages synchronously");
}
-
+
clusterLocators.add(serverLocator);
-
+
Bridge bridge = new BridgeImpl(serverLocator,
config.getReconnectAttempts(),
config.getRetryInterval(),
@@ -731,7 +752,7 @@
public void destroyBridge(final String name) throws Exception
{
Bridge bridge;
-
+
synchronized (this)
{
bridge = bridges.remove(name);
@@ -741,7 +762,7 @@
managementService.unregisterBridge(name);
}
}
-
+
bridge.flushExecutor();
}
@@ -790,10 +811,13 @@
"'. The cluster connection will not be
deployed.");
return;
}
-
+
if (log.isDebugEnabled())
{
- log.debug(this + " Starting a Discovery Group Cluster Connection,
name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
+ log.debug(this + " Starting a Discovery Group Cluster Connection,
name=" +
+ config.getDiscoveryGroupName() +
+ ", dg=" +
+ dg);
}
clusterConnection = new ClusterConnectionImpl(this,
@@ -828,7 +852,7 @@
{
TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ?
connectorNameListToArray(config.getStaticConnectors())
:
null;
-
+
if (log.isDebugEnabled())
{
log.debug(this + " defining cluster connection towards " +
Arrays.toString(tcConfigs));
@@ -869,7 +893,7 @@
if (log.isDebugEnabled())
{
- log.debug("ClusterConnection.start at " + clusterConnection, new
Exception ("trace"));
+ log.debug("ClusterConnection.start at " + clusterConnection, new
Exception("trace"));
}
clusterConnection.start();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-12
00:05:39 UTC (rev 11193)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-12
02:20:12 UTC (rev 11194)
@@ -2059,6 +2059,7 @@
{
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
+ Thread.sleep(500);
ClusterTestBase.log.info("server " + node + "
stopped");
}
catch (Exception e)