Author: clebert.suconic(a)jboss.com
Date: 2011-08-25 22:41:29 -0400 (Thu, 25 Aug 2011)
New Revision: 11226
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
fixing testsuite
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -1839,7 +1839,10 @@
private void doCleanup(boolean failingOver)
{
- remotingConnection.removeFailureListener(this);
+ if (remotingConnection == null)
+ {
+ remotingConnection.removeFailureListener(this);
+ }
if (log.isDebugEnabled())
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-25
19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -30,12 +30,14 @@
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author Clebert Suconic
* Created Aug 16, 2010
*/
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
+ // TODO: remove the backof from this class. It's probably not needed any longer
+ // private static final int BACKOF_TIMEOUT = 500;
private static final long serialVersionUID = -9037171688692471371L;
@@ -43,7 +45,7 @@
private static final Logger log = Logger.getLogger(Topology.class);
- private transient HashMap<String, Pair<Long, Integer>> mapBackof = new
HashMap<String, Pair<Long, Integer>>();
+ // private transient HashMap<String, Pair<Long, Integer>> mapBackof = new
HashMap<String, Pair<Long, Integer>>();
private Executor executor = null;
@@ -62,13 +64,16 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private final Map<String, TopologyMember> topology = new
ConcurrentHashMap<String, TopologyMember>();
+ private final Map<String, TopologyMember> mapTopology = new
ConcurrentHashMap<String, TopologyMember>();
public Topology(final Object owner)
{
this.owner = owner;
- Topology.log.debug("Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE",
- new Exception("trace")); // Delete this line
+ if (log.isTraceEnabled())
+ {
+ Topology.log.trace("Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+ new Exception("trace"));
+ }
}
public void setExecutor(final Executor executor)
@@ -100,143 +105,156 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember member, final
boolean last)
+ public boolean addMember(final String nodeId, final TopologyMember memberInput, final
boolean last)
{
- boolean replaced = false;
-
synchronized (this)
{
- TopologyMember currentMember = topology.get(nodeId);
+ TopologyMember currentMember = mapTopology.get(nodeId);
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug(this + "::adding = " + nodeId + ":" +
member.getConnector(), new Exception("trace"));
- }
-
if (currentMember == null)
{
- if (!testBackof(nodeId))
+ /*if (!testBackof(nodeId))
{
return false;
- }
+ } */
- replaced = true;
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug("Add " + this +
+ Topology.log.debug(this + "::NewMemeberAdd " + this +
" MEMBER WAS NULL, Add member nodeId=" +
nodeId +
" member = " +
- member +
- " replaced = " +
- replaced +
+ memberInput +
" size = " +
- topology.size(), new Exception("trace"));
+ mapTopology.size(), new Exception("trace"));
}
- topology.put(nodeId, member);
+ mapTopology.put(nodeId, memberInput);
+ sendMemberUp(nodeId, memberInput);
+ return true;
}
else
{
- if (hasChanged(currentMember.getConnector().a, member.getConnector().a)
&& member.getConnector().a != null)
+ if (log.isTraceEnabled())
{
- if (!testBackof(nodeId))
+ log.trace(this + ":: validating update for currentMember=" +
currentMember + " of memberInput=" + memberInput);
+ }
+
+ boolean replaced = false;
+ TopologyMember memberToSend = currentMember;
+
+ if (hasChanged("a", memberToSend.getConnector().a,
memberInput.getConnector().a))
+ {
+ /*if (!replaced && !testBackof(nodeId))
{
return false;
- }
-
- currentMember.getConnector().a = member.getConnector().a;
+ }*/
+ memberToSend = new TopologyMember(memberInput.getConnector().a,
memberToSend.getConnector().b);
replaced = true;
}
- if (hasChanged(currentMember.getConnector().b, member.getConnector().b)
&& member.getConnector().b != null)
+
+ if (hasChanged("b", memberToSend.getConnector().b,
memberInput.getConnector().b))
{
- if (!testBackof(nodeId))
+ /*if (!replaced && !testBackof(nodeId))
{
return false;
- }
-
- currentMember.getConnector().b = member.getConnector().b;
+ }*/
+ memberToSend = new TopologyMember(memberToSend.getConnector().a,
memberInput.getConnector().b);
replaced = true;
}
- if (member.getConnector().a == null)
+ if (replaced)
{
- member.getConnector().a = currentMember.getConnector().a;
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, memberToSend);
+
+ sendMemberUp(nodeId, memberToSend);
+ return true;
}
- if (member.getConnector().b == null)
- {
- member.getConnector().b = currentMember.getConnector().b;
- }
- }
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug(this + " Add member nodeId=" +
- nodeId +
- " member = " +
- member +
- " replaced = " +
- replaced +
- " size = " +
- topology.size(), new Exception("trace"));
}
}
- if (replaced)
+ if (Topology.log.isDebugEnabled())
{
+ Topology.log.debug(Topology.this + " Add member nodeId=" +
+ nodeId +
+ " member = " +
+ memberInput +
+ " has been ignored since there was no change", new
Exception("trace"));
+ }
- final ArrayList<ClusterTopologyListener> copy = copyListeners();
+ return false;
+ }
- execute(new Runnable()
+ /**
+ * @param nodeId
+ * @param memberToSend
+ */
+ private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+ {
+ final ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::prepare to send " + nodeId + " to " +
copy.size() + " elements");
+ }
+
+ execute(new Runnable()
+ {
+ public void run()
{
- public void run()
+ for (ClusterTopologyListener listener : copy)
{
- for (ClusterTopologyListener listener : copy)
+ if (Topology.log.isTraceEnabled())
{
- if (Topology.log.isTraceEnabled())
- {
- Topology.log.trace(this + " informing " + listener +
" about node up = " + nodeId);
- }
+ Topology.log.trace(Topology.this + " informing " + listener +
" about node up = " + nodeId);
+ }
- try
- {
- listener.nodeUP(nodeId, member.getConnector(), last);
- }
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- }
+ try
+ {
+ listener.nodeUP(nodeId, memberToSend.getConnector(), false);
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
- });
- }
-
- return replaced;
+ }
+ });
}
/**
* @param nodeId
* @param backOfData
*/
- private boolean testBackof(final String nodeId)
+ /*private boolean testBackof(final String nodeId)
{
Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
if (backOfData != null)
{
backOfData.b += 1;
-
+
long timeDiff = System.currentTimeMillis() - backOfData.a;
// To prevent a loop where nodes are being considered down and up
if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
{
+
// The cluster may get in loop without this..
// Case one node is stll sending nodeDown while another member is sending
nodeUp
- log.warn("The topology controller identified a blast events and it's
interrupting the flow of the loop",
+ log.warn(backOfData.b + ", The topology controller identified a blast
events and it's interrupting the flow of the loop, nodeID=" +
+ nodeId +
+ ", topologyInstance=" +
+ this,
new Exception("this exception is just to trace
location"));
return false;
}
+ else if (timeDiff < BACKOF_TIMEOUT)
+ {
+ log.warn(this + "::Simple blast of " + nodeId, new
Exception("this exception is just to trace location"));
+ }
else if (timeDiff >= BACKOF_TIMEOUT)
{
mapBackof.remove(nodeId);
@@ -244,7 +262,7 @@
}
return true;
- }
+ } */
/**
* @return
@@ -265,22 +283,22 @@
synchronized (this)
{
- Pair<Long, Integer> value = mapBackof.get(nodeId);
+// Pair<Long, Integer> value = mapBackof.get(nodeId);
+//
+// if (value == null)
+// {
+// value = new Pair<Long, Integer>(0l, 0);
+// mapBackof.put(nodeId, value);
+// }
+//
+// value.a = System.currentTimeMillis();
+//
+// if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
+// {
+// value.b = 0;
+// }
- if (value == null)
- {
- value = new Pair<Long, Integer>(0l, 0);
- mapBackof.put(nodeId, value);
- }
-
- value.a = System.currentTimeMillis();
-
- if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
- {
- value.b = 0;
- }
-
- member = topology.remove(nodeId);
+ member = mapTopology.remove(nodeId);
}
if (Topology.log.isDebugEnabled())
@@ -291,7 +309,7 @@
", result=" +
member +
", size = " +
- topology.size(), new Exception("trace"));
+ mapTopology.size(), new Exception("trace"));
}
if (member != null)
@@ -376,19 +394,19 @@
log.debug(this + " is sending topology to " + listener);
}
- final Map<String, TopologyMember> copy;
-
- synchronized (this)
- {
- copy = new HashMap<String, TopologyMember>(topology);
- }
-
execute(new Runnable()
{
public void run()
{
int count = 0;
+ final Map<String, TopologyMember> copy;
+
+ synchronized (Topology.this)
+ {
+ copy = new HashMap<String, TopologyMember>(mapTopology);
+ }
+
for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
{
if (log.isDebugEnabled())
@@ -408,12 +426,12 @@
public TopologyMember getMember(final String nodeID)
{
- return topology.get(nodeID);
+ return mapTopology.get(nodeID);
}
public boolean isEmpty()
{
- return topology.isEmpty();
+ return mapTopology.isEmpty();
}
public Collection<TopologyMember> getMembers()
@@ -421,7 +439,7 @@
ArrayList<TopologyMember> members;
synchronized (this)
{
- members = new ArrayList<TopologyMember>(topology.values());
+ members = new ArrayList<TopologyMember>(mapTopology.values());
}
return members;
}
@@ -429,7 +447,7 @@
public synchronized int nodes()
{
int count = 0;
- for (TopologyMember member : topology.values())
+ for (TopologyMember member : mapTopology.values())
{
if (member.getConnector().a != null)
{
@@ -452,7 +470,7 @@
{
String desc = text + "\n";
- for (Entry<String, TopologyMember> entry : new HashMap<String,
TopologyMember>(topology).entrySet())
+ for (Entry<String, TopologyMember> entry : new HashMap<String,
TopologyMember>(mapTopology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue()
+ "\n";
}
@@ -466,12 +484,12 @@
{
Topology.log.debug(this + "::clear", new
Exception("trace"));
}
- topology.clear();
+ mapTopology.clear();
}
public int members()
{
- return topology.size();
+ return mapTopology.size();
}
/** The owner exists mainly for debug purposes.
@@ -482,16 +500,26 @@
this.owner = owner;
}
- private boolean hasChanged(final TransportConfiguration currentConnector, final
TransportConfiguration connector)
+ private boolean hasChanged(final String debugInfo, final TransportConfiguration a,
final TransportConfiguration b)
{
- return currentConnector == null && connector != null ||
- currentConnector != null &&
- !currentConnector.equals(connector);
+ boolean changed = a == null && b != null || a != null && b != null
&& !a.equals(b);
+
+ if (log.isTraceEnabled())
+ {
+
+ log.trace(this + "::Validating current=" + a
+ + " != input=" + b +
+ (changed ? " and it has changed" : " and it didn't
change") +
+ ", for validation of " +
+ debugInfo);
+ }
+
+ return changed;
}
public TransportConfiguration getBackupForConnector(final TransportConfiguration
connectorConfiguration)
{
- for (TopologyMember member : topology.values())
+ for (TopologyMember member : mapTopology.values())
{
if (member.getConnector().a != null &&
member.getConnector().a.equals(connectorConfiguration))
{
@@ -509,7 +537,7 @@
{
if (owner == null)
{
- return super.toString();
+ return "Topology@" +
Integer.toHexString(System.identityHashCode(this));
}
else
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -32,6 +32,11 @@
this.connector = connector;
}
+ public TopologyMember(TransportConfiguration a, TransportConfiguration b)
+ {
+ this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
+ }
+
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -564,6 +564,10 @@
{
if (connectorPair.b != null)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::informing about backup to itself, nodeUUID=" +
nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
+ }
server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
}
return;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -2021,9 +2021,9 @@
for (int node : nodes)
{
log.info("#test start node " + node);
- if (System.currentTimeMillis() - timeStarts[node] < 1000)
+ if (System.currentTimeMillis() - timeStarts[node] < 100)
{
- Thread.sleep(1000);
+ Thread.sleep(100);
}
timeStarts[node] = System.currentTimeMillis();
@@ -2065,10 +2065,10 @@
{
try
{
- if (System.currentTimeMillis() - timeStarts[node] < 1000)
+ if (System.currentTimeMillis() - timeStarts[node] < 100)
{
// We can't stop and start a node too fast (faster than what the
Topology could realize about this
- Thread.sleep(1000);
+ Thread.sleep(100);
}
timeStarts[node] = System.currentTimeMillis();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -112,7 +112,6 @@
setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 3, 4);
- // startServers(3, 4, 5, 0, 1, 2);
startServers(0, 1, 2, 3, 4, 5);
log.info("");
@@ -122,11 +121,7 @@
log.info(debugBindings(servers[i],
servers[i].getConfiguration().getManagementNotificationAddress().toString()));
}
log.info("");
-
- //stopServers(3);
-
- Thread.sleep(1000);
-
+
log.info("");
for (int i = 0; i <= 5; i++)
{
@@ -168,21 +163,13 @@
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
- // Thread.sleep(1500);
-
createQueue(0, "queues.testaddress", "queue0", null, false);
- // Thread.sleep(1500);
createQueue(1, "queues.testaddress", "queue0", null, false);
- // Thread.sleep(1500);
createQueue(2, "queues.testaddress", "queue0", null, false);
- // Thread.sleep(1500);
addConsumer(0, 0, "queue0", null);
- // Thread.sleep(1500);
addConsumer(1, 1, "queue0", null);
- // Thread.sleep(1500);
addConsumer(2, 2, "queue0", null);
- // Thread.sleep(1500);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -291,21 +278,14 @@
// Need to wait some time so the bridges and
// connectors had time to connect properly between the nodes
- Thread.sleep(1000);
createQueue(0, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
createQueue(1, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
createQueue(2, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
addConsumer(0, 0, "queue0", null);
- // Thread.sleep(1500);
addConsumer(1, 1, "queue0", null);
- // Thread.sleep(1500);
addConsumer(2, 2, "queue0", null);
- // Thread.sleep(1500);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -375,21 +355,13 @@
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
- // Thread.sleep(1500);
-
createQueue(0, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
createQueue(1, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
createQueue(2, "queues.testaddress", "queue0", null, true);
- // Thread.sleep(1500);
addConsumer(0, 0, "queue0", null);
- // Thread.sleep(1500);
addConsumer(1, 1, "queue0", null);
- // Thread.sleep(1500);
addConsumer(2, 2, "queue0", null);
- // Thread.sleep(1500);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -402,16 +374,8 @@
send(0, "queues.testaddress", 33, true, null);
verifyReceiveRoundRobin(33, 0, 1, 2);
-
- Thread.sleep(1000);
- // TODO: need to make sure the shutdown won't be send, what will affect the
test
stopServers(2);
-//
-// Thread.sleep(5000);
-//
-// waitForBindings(0, "queues.testaddress", 2, 2, false);
-// waitForBindings(1, "queues.testaddress", 2, 2, false);
send(0, "queues.testaddress", 100, true, null);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-08-25
19:23:40 UTC (rev 11225)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-08-26
02:41:29 UTC (rev 11226)
@@ -395,6 +395,8 @@
backupJMSService.setContext(ctx2);
+ backupJMSService.getHornetQServer().setIdentity("JMSBackup");
+ log.info("Starting backup");
backupJMSService.start();
liveConf = createBasicConfig(0);
@@ -432,6 +434,9 @@
liveJMSService.setContext(ctx1);
+ liveJMSService.getHornetQServer().setIdentity("JMSLive");
+ log.info("Starting life");
+
liveJMSService.start();
JMSUtil.waitForServer(backupService);