Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 00:22:24 -0400 (Fri, 29 Jul 2011)
New Revision: 11066
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
test fixes
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -68,8 +68,6 @@
private String identity;
- private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
-
private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
private TransportConfiguration[] initialConnectors;
@@ -444,6 +442,7 @@
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration
groupConfiguration)
{
this(new Topology(null), useHA, groupConfiguration, null);
+ topology.setOwner(this);
}
/**
@@ -454,6 +453,7 @@
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration...
transportConfigs)
{
this(new Topology(null), useHA, null, transportConfigs);
+ topology.setOwner(this);
}
/**
@@ -1215,7 +1215,7 @@
}
removed = topology.removeMember(nodeID);
-
+
if (!topology.isEmpty())
{
updateArraysAndPairs();
@@ -1232,13 +1232,6 @@
receivedTopology = false;
}
- if (removed)
- {
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
}
public synchronized void notifyNodeUp(final String nodeID,
@@ -1263,7 +1256,7 @@
log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID +
", connectorPair=" + connectorPair, new Exception ("trace"));
}
- topology.addMember(nodeID, new TopologyMember(connectorPair));
+ topology.addMember(nodeID, new TopologyMember(connectorPair), last);
TopologyMember actMember = topology.getMember(nodeID);
@@ -1286,11 +1279,6 @@
receivedTopology = true;
}
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last);
- }
-
// Notify if waiting on getting topology
notify();
}
@@ -1371,16 +1359,12 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
- topologyListeners.add(listener);
- if (topology.members() > 0)
- {
- log.debug(this + "::ServerLocatorImpl.addClusterTopologyListener");
- }
+ topology.addClusterTopologyListener(listener);
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
- topologyListeners.remove(listener);
+ topology.removeClusterTopologyListener(listener);
}
public synchronized void addFactory(ClientSessionFactoryInternal factory)
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -15,10 +15,14 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -30,16 +34,13 @@
*/
public class Topology implements Serializable
{
-
- /**
- *
- */
+
private static final long serialVersionUID = -9037171688692471371L;
-
+ private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
-
+
/** Used to debug operations.
*
* Someone may argue this is not needed. But it's impossible to debg anything
related to topology without knowing what node
@@ -47,13 +48,15 @@
*
* Hence I added some information to locate debugging here.
* */
- private final Object owner;
-
-
+ private volatile Object owner;
+
+ private volatile Executor executor;
+
public Topology(final Object owner)
{
this.owner = owner;
- log.debug("ZZZ III Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception
("trace")); // Delete this line
+ Topology.log.debug("ZZZ III Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+ new Exception("trace")); // Delete this line
}
/*
@@ -62,89 +65,224 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private Map<String, TopologyMember> topology = new ConcurrentHashMap<String,
TopologyMember>();
+ private final Map<String, TopologyMember> topology = new
ConcurrentHashMap<String, TopologyMember>();
- private boolean debug = log.isDebugEnabled();
+ public void setExecutor(Executor executor)
+ {
+ this.executor = executor;
+ }
- public synchronized boolean addMember(String nodeId, TopologyMember member)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
- boolean replaced = false;
- TopologyMember currentMember = topology.get(nodeId);
- if (debug)
+ if (log.isDebugEnabled())
{
- log.debug(this + "::adding = " + nodeId + ":" +
member.getConnector(), new Exception ("trace"));
- log.debug(describe("Before:"));
+ log.debug(this + "::PPP Adding topology listener " + listener, new
Exception("Trace"));
}
- if(currentMember == null)
+ synchronized (topologyListeners)
{
- replaced = true;
- if (log.isDebugEnabled())
- {
- log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member
nodeId=" + nodeId + " member = " + member + " replaced = " +
replaced + " size = " + topology.size(), new Exception ("trace"));
- }
- topology.put(nodeId, member);
+ topologyListeners.add(listener);
}
- else
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ if (log.isDebugEnabled())
{
- if(hasChanged(currentMember.getConnector().a, member.getConnector().a)
&& member.getConnector().a != null)
+ log.debug(this + "::PPP Removing topology listener " + listener, new
Exception("Trace"));
+ }
+ synchronized (topologyListeners)
+ {
+ topologyListeners.remove(listener);
+ }
+ }
+
+ public boolean addMember(final String nodeId, final TopologyMember member, final
boolean last)
+ {
+ boolean replaced = false;
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = topology.get(nodeId);
+
+ if (Topology.log.isDebugEnabled())
{
- currentMember.getConnector().a = member.getConnector().a;
- replaced = true;
+ Topology.log.debug(this + "::adding = " + nodeId + ":" +
member.getConnector(), new Exception("trace"));
+ Topology.log.debug(describe("Before:"));
}
- if(hasChanged(currentMember.getConnector().b, member.getConnector().b)
&& member.getConnector().b != null)
+
+ if (currentMember == null)
{
- currentMember.getConnector().b = member.getConnector().b;
replaced = true;
+ if (Topology.log.isDebugEnabled())
+ {
+ Topology.log.debug("ZZZ " + this +
+ " MEMBER WAS NULL, Add member nodeId=" +
+ nodeId +
+ " member = " +
+ member +
+ " replaced = " +
+ replaced +
+ " size = " +
+ topology.size(), new Exception("trace"));
+ }
+ topology.put(nodeId, member);
}
+ else
+ {
+ if (hasChanged(currentMember.getConnector().a, member.getConnector().a)
&& member.getConnector().a != null)
+ {
+ currentMember.getConnector().a = member.getConnector().a;
+ replaced = true;
+ }
+ if (hasChanged(currentMember.getConnector().b, member.getConnector().b)
&& member.getConnector().b != null)
+ {
+ currentMember.getConnector().b = member.getConnector().b;
+ replaced = true;
+ }
- if(member.getConnector().a == null)
+ if (member.getConnector().a == null)
+ {
+ member.getConnector().a = currentMember.getConnector().a;
+ }
+ if (member.getConnector().b == null)
+ {
+ member.getConnector().b = currentMember.getConnector().b;
+ }
+ }
+
+ if (Topology.log.isDebugEnabled())
{
- member.getConnector().a = currentMember.getConnector().a;
+ Topology.log.debug(this + "::Topology updated=" + replaced);
+ Topology.log.debug(describe(this + "::After:"));
}
- if(member.getConnector().b == null)
+
+ if (Topology.log.isDebugEnabled())
{
- member.getConnector().b = currentMember.getConnector().b;
+ Topology.log.debug("ZZZ " + this +
+ " Add member nodeId=" +
+ nodeId +
+ " member = " +
+ member +
+ " replaced = " +
+ replaced +
+ " size = " +
+ topology.size(), new Exception("trace"));
}
+
}
- if(debug)
+
+ if (replaced)
{
- log.debug(this + "::Topology updated=" + replaced);
- log.debug(describe(this + "::After:"));
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace("XXX ZZZ " + this + " informing " +
listener + " about node up = " + nodeId);
+ }
+
+ listener.nodeUP(nodeId, member.getConnector(), last);
+ }
}
+
+ return replaced;
+ }
+
+ /**
+ * @return
+ */
+ private ArrayList<ClusterTopologyListener> copyListeners()
+ {
+ ArrayList <ClusterTopologyListener> listenersCopy;
+ synchronized (topologyListeners)
+ {
+ listenersCopy = new
ArrayList<ClusterTopologyListener>(topologyListeners);
+ }
+ return listenersCopy;
+ }
+
+ public boolean removeMember(final String nodeId)
+ {
+ TopologyMember member;
- if (log.isDebugEnabled())
+ synchronized (this)
{
- log.debug("ZZZ " + this + " Add member nodeId=" + nodeId +
" member = " + member + " replaced = " + replaced + " size =
" + topology.size(), new Exception ("trace"));
+ member = topology.remove(nodeId);
}
-
- return replaced;
+
+ if (member != null)
+ {
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace("XXX ZZZ " + this + " informing " +
listener + " about node down = " + nodeId);
+ }
+ listener.nodeDown(nodeId);
+ }
+ }
+
+ if (Topology.log.isDebugEnabled())
+ {
+ Topology.log.debug("ZZZ " + this +
+ " removing nodeID=" +
+ nodeId +
+ ", result=" +
+ member +
+ ", size = " +
+ topology.size(), new Exception("trace"));
+ }
+
+ return member != null;
}
- public synchronized boolean removeMember(String nodeId)
+ /**
+ * it will send all the member updates to listeners, independently of being changed or
not
+ * @param nodeID
+ * @param member
+ */
+ public void sendMemberToListeners(String nodeID, TopologyMember member)
{
- TopologyMember member = topology.remove(nodeId);
- if (log.isDebugEnabled())
+ // To make sure it was updated
+ addMember(nodeID, member, false);
+
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+ // Now force sending it
+ for (ClusterTopologyListener listener : copy)
{
- log.debug("ZZZ " + this + " removing nodeID=" + nodeId +
", result=" + member + ", size = " + topology.size(), new Exception
("trace"));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Informing client listener " + listener +
+ " about itself node " +
+ nodeID +
+ " with connector=" +
+ member.getConnector());
+ }
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
- return (member != null);
}
- public void sendTopology(ClusterTopologyListener listener)
+ public synchronized void sendTopology(final ClusterTopologyListener listener)
{
int count = 0;
+
Map<String, TopologyMember> copy;
+
synchronized (this)
{
copy = new HashMap<String, TopologyMember>(topology);
}
+
for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
{
listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count ==
copy.size());
}
}
- public TopologyMember getMember(String nodeID)
+ public TopologyMember getMember(final String nodeID)
{
return topology.get(nodeID);
}
@@ -169,8 +307,6 @@
int count = 0;
for (TopologyMember member : topology.values())
{
-
- // ARRUMAR ISSO
if (member.getConnector().a != null)
{
count++;
@@ -182,12 +318,13 @@
}
return count;
}
+
public synchronized String describe()
{
return describe("");
}
- public synchronized String describe(String text)
+ public synchronized String describe(final String text)
{
String desc = text + "\n";
@@ -201,11 +338,11 @@
public void clear()
{
- if (log.isDebugEnabled())
+ if (Topology.log.isDebugEnabled())
{
- log.debug("ZZZ " + this + "::clear", new Exception
("trace"));
+ Topology.log.debug("ZZZ III " + this + "::clear", new
Exception("trace"));
}
- topology.clear();
+ // topology.clear();
}
public int members()
@@ -213,28 +350,30 @@
return topology.size();
}
- private boolean hasChanged(TransportConfiguration currentConnector,
TransportConfiguration connector)
+ public void setOwner(final Object owner)
{
- return (currentConnector == null && connector != null) || (currentConnector
!= null && !currentConnector.equals(connector));
+ this.owner = owner;
}
- public TransportConfiguration getBackupForConnector(TransportConfiguration
connectorConfiguration)
+ private boolean hasChanged(final TransportConfiguration currentConnector, final
TransportConfiguration connector)
{
+ return currentConnector == null && connector != null ||
+ currentConnector != null &&
+ !currentConnector.equals(connector);
+ }
+
+ public TransportConfiguration getBackupForConnector(final TransportConfiguration
connectorConfiguration)
+ {
for (TopologyMember member : topology.values())
{
- if(member.getConnector().a != null &&
member.getConnector().a.equals(connectorConfiguration))
+ if (member.getConnector().a != null &&
member.getConnector().a.equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getConnector().b;
}
}
return null;
}
- public void setDebug(boolean b)
- {
- debug = b;
- }
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -250,5 +389,5 @@
return "Topology@" +
Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner +
"]";
}
}
-
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -118,13 +118,18 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
{
- channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
}
public void nodeDown(String nodeID)
{
- channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
+
+ public String toString()
+ {
+ return "Remote Proxy on channel " +
Integer.toHexString(System.identityHashCode(this));
+ }
};
final boolean isCC = msg.isClusterConnection();
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -105,7 +106,7 @@
private final boolean routeWhenNoConsumers;
- private final Map<String, MessageFlowRecord> records = new HashMap<String,
MessageFlowRecord>();
+ private final Map<String, MessageFlowRecord> records = new
ConcurrentHashMap<String, MessageFlowRecord>();
private final ScheduledExecutorService scheduledExecutor;
@@ -498,7 +499,7 @@
// ClusterTopologyListener implementation
------------------------------------------------------------------
- public synchronized void nodeDown(final String nodeID)
+ public void nodeDown(final String nodeID)
{
if (log.isDebugEnabled())
{
@@ -533,7 +534,7 @@
}
- public synchronized void nodeUP(final String nodeID,
+ public void nodeUP(final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -100,8 +100,6 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new
HashMap<String, ClusterConnection>();
- private final Set<ClusterTopologyListener> topologyListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
private final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
@@ -258,10 +256,7 @@
});
started = false;
- topologyListeners.clear();
clusterConnections.clear();
- topology.clear();
-
}
public void notifyNodeDown(String nodeID)
@@ -273,16 +268,8 @@
log.debug("XXX " + this + "::removing nodeID=" + nodeID, new
Exception ("trace"));
- boolean removed = topology.removeMember(nodeID);
+ topology.removeMember(nodeID);
- if (removed)
- {
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
}
public void notifyNodeUp(final String nodeID,
@@ -296,7 +283,7 @@
}
TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member);
+ boolean updated = topology.addMember(nodeID, member, last);
if (!updated)
{
@@ -307,11 +294,6 @@
return;
}
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), last);
- }
-
if (log.isDebugEnabled())
{
log.debug("XXX " + this + " received notifyNodeUp nodeID=" +
nodeID + " connectorPair=" + connectorPair +
@@ -365,7 +347,7 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
{
- topologyListeners.add(listener);
+ topology.addClusterTopologyListener(listener);
// We now need to send the current topology to the client
executor.execute(new Runnable(){
@@ -380,7 +362,7 @@
public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean
clusterConnection)
{
- topologyListeners.add(listener);
+ topology.removeClusterTopologyListener(listener);
}
public Topology getTopology()
@@ -398,14 +380,10 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- // we swap the topology backup now = live
- if (member != null)
- {
- member.getConnector().a = member.getConnector().b;
+ //swap backup as live and send it to everybody
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(member.getConnector().b, null));
+ topology.addMember(nodeID, member, false);
- member.getConnector().b = null;
- }
-
if (backupServerLocator != null)
{
// todo we could use the topology of this to preempt it arriving from the cc
@@ -456,15 +434,8 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Informing client listener " + listener + " about
itself node " + nodeID + " with connector=" + member.getConnector());
- }
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
+
+ topology.sendMemberToListeners(nodeID, member);
}
}
@@ -519,7 +490,7 @@
null));
}
- topology.addMember(nodeID, member);
+ topology.addMember(nodeID, member, false);
}
else
{
@@ -532,13 +503,6 @@
// pair.a = cc.getConnector();
}
}
-
- // Propagate the announcement
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -456,7 +456,7 @@
}
catch (Exception e)
{
- log.info("unable to restart server, please kill
and restart manually", e);
+ log.warn("unable to restart server, please kill
and restart manually", e);
}
}
});
@@ -465,6 +465,7 @@
}
catch (Exception e)
{
+ log.debug(e.getMessage(), e);
//hopefully it will work next call
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -265,14 +265,16 @@
{
if (nodes == topology.getMembers().size())
{
- return;
+
+ log.info("ZZZ III look up for topology on " + topology + " size
= " + topology.getMembers().size());
+ return;
}
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < timeout);
- String msg = "Timed out waiting for cluster topology of " + nodes +
" (received " + topology.getMembers().size() + ") nodes on server = "
+ server + ")\n Current topology:" + topology.describe();
+ String msg = "ZZZ Timed out waiting for cluster topology of " + nodes +
" (received " + topology.getMembers().size() + ") topology = " +
topology + ")\n Current topology:" + topology.describe();
ClusterTestBase.log.error(msg);
@@ -1988,6 +1990,7 @@
{
for (int node : nodes)
{
+ log.info("#test start node " + node);
servers[node].setIdentity("server " + node);
ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
@@ -1997,20 +2000,14 @@
ClusterTestBase.log.info("started server " + node);
waitForServer(servers[node]);
-
- for (int i = 0 ; i <= node; i++)
- {
- try
- {
- log.info("Describing Server " + servers[i]);
- log.info(servers[i].describe());
- }
- catch (Throwable ignored)
- {
-
- }
- }
}
+
+ for (int node: nodes)
+ {
+ System.out.println(servers[node].describe());
+ }
+
+
}
protected void waitForServer(HornetQServer server)
@@ -2050,6 +2047,7 @@
log.info("Stopping nodes " + Arrays.toString(nodes));
for (int node : nodes)
{
+ log.info("#test stop server " + node);
if (servers[node] != null && servers[node].isStarted())
{
try
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -149,7 +149,6 @@
setupCluster();
startServers(5, 0);
- servers[0].getClusterManager().getTopology().setDebug(true);
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -128,21 +128,12 @@
stopServers(0, 1);
}
- public void _testLoop() throws Exception
- {
- for (int i = 0; i < 1000; i++)
- {
- log.info("#test " + i);
- testStopStart();
- tearDown();
- setUp();
- }
- }
-
public void testRestartTest() throws Throwable
{
startServers(0, 1);
waitForTopology(servers[0], 2);
+
+ log.info("ZZZ Server 0 " + servers[0].describe());
// try
// {
@@ -160,15 +151,30 @@
for (int i = 0; i < 100; i++)
{
log.info("#stop #test #" + i);
+ Thread.sleep(500);
stopServers(1);
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
+ Thread.sleep(500);
startServers(1);
+ Thread.sleep(500);
waitForTopology(servers[0], 2, 2000);
+ waitForTopology(servers[1], 2, 2000);
}
}
+ public void testLoop() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ log.info("#test " + i);
+ testStopStart();
+ tearDown();
+ setUp();
+ }
+ }
+
public void testStopStart() throws Exception
{
startServers(0, 1);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-07-28
17:44:46 UTC (rev 11065)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-07-29
04:22:24 UTC (rev 11066)
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
@@ -26,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -194,6 +196,8 @@
startServers(0);
ServerLocator locator = createHAServerLocator();
+
+ ((ServerLocatorImpl)locator).getTopology().setOwner("ZZZ III
testReceive");
final List<String> nodes = new ArrayList<String>();
final CountDownLatch upLatch = new CountDownLatch(5);
@@ -207,18 +211,32 @@
{
if(!nodes.contains(nodeID))
{
+ System.out.println("Node UP " + nodeID + " added");
+ log.info("ZZZ III Node UP " + nodeID + " added");
nodes.add(nodeID);
upLatch.countDown();
}
+ else
+ {
+ System.out.println("Node UP " + nodeID + " was already
here");
+ log.info("ZZZ III Node UP " + nodeID + " was already
here");
+ }
}
public void nodeDown(String nodeID)
{
if (nodes.contains(nodeID))
{
+ log.info("ZZZ III Node down " + nodeID + "
accepted");
+ System.out.println("Node down " + nodeID + "
accepted");
nodes.remove(nodeID);
downLatch.countDown();
}
+ else
+ {
+ log.info("ZZZ III Node down " + nodeID + " already
removed");
+ System.out.println("Node down " + nodeID + " already
removed");
+ }
}
});