[hornetq-commits] JBoss hornetq SVN: r11220 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 24 19:02:39 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-24 19:02:39 -0400 (Wed, 24 Aug 2011)
New Revision: 11220
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -1415,12 +1415,19 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
+
+ SimpleString nodeID = msg.getNodeID();
if (log.isTraceEnabled())
{
- log.trace("Disconnect being called on client:" + msg + " server locator = " + serverLocator, new Exception ("trace"));
+ log.trace("Disconnect being called on client:" + msg + " server locator = " + serverLocator + " notifying node " + nodeID + " as down", new Exception ("trace"));
}
+ if (nodeID != null)
+ {
+ serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ }
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1430,15 +1437,6 @@
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
- SimpleString nodeID = msg.getNodeID();
- if (log.isTraceEnabled())
- {
- log.trace("notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
- }
- if (nodeID != null)
- {
- serverLocator.notifyNodeDown(msg.getNodeID().toString());
- }
}
});
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.logging.Logger;
@@ -35,15 +36,15 @@
{
private static final int BACKOF_TIMEOUT = 50;
-
+
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
-
- private transient HashMap<String, Long> mapBackof = new HashMap<String, Long>();
+ private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
+
private Executor executor = null;
/** Used to debug operations.
@@ -55,7 +56,6 @@
* */
private volatile Object owner;
-
/**
* topology describes the other cluster nodes that this server knows about:
*
@@ -80,7 +80,7 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::PPP Adding topology listener " + listener, new Exception("Trace"));
+ log.debug(this + "::Adding topology listener " + listener, new Exception("Trace"));
}
synchronized (topologyListeners)
{
@@ -106,16 +106,6 @@
synchronized (this)
{
- Long lastTime = mapBackof.get(nodeId);
-
- if (lastTime != null && System.currentTimeMillis() - lastTime.longValue() < BACKOF_TIMEOUT)
- {
- // The cluster may get in loop without this..
- // Case one node is stll sending nodeDown while another member is sending nodeUp
- log.warn("Node was considered down too fast, ignoring addMember on Topology", new Exception("trace"));
- return false;
- }
-
TopologyMember currentMember = topology.get(nodeId);
if (Topology.log.isDebugEnabled())
@@ -125,6 +115,11 @@
if (currentMember == null)
{
+ if (!testBackof(nodeId))
+ {
+ return false;
+ }
+
replaced = true;
if (Topology.log.isDebugEnabled())
{
@@ -144,11 +139,21 @@
{
if (hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
{
+ if (!testBackof(nodeId))
+ {
+ return false;
+ }
+
currentMember.getConnector().a = member.getConnector().a;
replaced = true;
}
if (hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
{
+ if (!testBackof(nodeId))
+ {
+ return false;
+ }
+
currentMember.getConnector().b = member.getConnector().b;
replaced = true;
}
@@ -180,9 +185,8 @@
if (replaced)
{
-
final ArrayList<ClusterTopologyListener> copy = copyListeners();
-
+
execute(new Runnable()
{
public void run()
@@ -211,6 +215,38 @@
}
/**
+ * @param nodeId
+ * @param backOfData
+ */
+ private boolean testBackof(final String nodeId)
+ {
+ Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
+
+ if (backOfData != null)
+ {
+ backOfData.b += 1;
+
+ long timeDiff = System.currentTimeMillis() - backOfData.a;
+
+ // To prevent a loop where nodes are being considered down and up
+ if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
+ {
+ // The cluster may get in loop without this..
+ // Case one node is stll sending nodeDown while another member is sending nodeUp
+ log.warn("The topology controller identified a blast of nodeUp and down and it is ignoring a nodeUP",
+ new Exception("this exception is just to trace location"));
+ return false;
+ }
+ else if (timeDiff >= BACKOF_TIMEOUT)
+ {
+ mapBackof.remove(nodeId);
+ }
+ }
+
+ return true;
+ }
+
+ /**
* @return
*/
private ArrayList<ClusterTopologyListener> copyListeners()
@@ -229,7 +265,21 @@
synchronized (this)
{
- mapBackof.put(nodeId, new Long(System.currentTimeMillis()));
+ Pair<Long, Integer> value = mapBackof.get(nodeId);
+
+ if (value == null)
+ {
+ value = new Pair<Long, Integer>(0l, 0);
+ mapBackof.put(nodeId, value);
+ }
+
+ value.a = System.currentTimeMillis();
+
+ if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
+ {
+ value.b = 0;
+ }
+
member = topology.remove(nodeId);
}
@@ -273,7 +323,7 @@
}
return member != null;
}
-
+
protected void execute(final Runnable runnable)
{
if (executor != null)
@@ -291,43 +341,69 @@
* @param nodeID
* @param member
*/
- public void sendMemberToListeners(String nodeID, TopologyMember member)
+ public void sendMemberToListeners(final String nodeID, final TopologyMember member)
{
// To make sure it was updated
addMember(nodeID, member, false);
- ArrayList<ClusterTopologyListener> copy = copyListeners();
+ final ArrayList<ClusterTopologyListener> copy = copyListeners();
- // Now force sending it
- for (ClusterTopologyListener listener : copy)
+ execute(new Runnable()
{
- if (log.isDebugEnabled())
+ public void run()
{
- log.debug("Informing client listener " + listener +
- " about itself node " +
- nodeID +
- " with connector=" +
- member.getConnector());
+ // Now force sending it
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Informing client listener " + listener +
+ " about itself node " +
+ nodeID +
+ " with connector=" +
+ member.getConnector());
+ }
+ listener.nodeUP(nodeID, member.getConnector(), false);
+ }
}
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
+ });
}
- public void sendTopology(final ClusterTopologyListener listener)
+ public synchronized void sendTopology(final ClusterTopologyListener listener)
{
- int count = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is sending topology to " + listener);
+ }
- Map<String, TopologyMember> copy;
+ final Map<String, TopologyMember> copy;
synchronized (this)
{
copy = new HashMap<String, TopologyMember>(topology);
}
- for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
+ execute(new Runnable()
{
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
- }
+ public void run()
+ {
+ int count = 0;
+
+ for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(Topology.this + " sending " +
+ entry.getKey() +
+ " / " +
+ entry.getValue().getConnector() +
+ " to " +
+ listener);
+ }
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
+ }
+ }
+ });
}
public TopologyMember getMember(final String nodeID)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -380,15 +380,8 @@
{
topology.addClusterTopologyListener(listener);
- // We now need to send the current topology to the client
- executor.execute(new Runnable()
- {
- public void run()
- {
- topology.sendTopology(listener);
-
- }
- });
+ // no need to use an executor here since the Topology is already using one
+ topology.sendTopology(listener);
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -169,7 +169,7 @@
removeConsumer(1);
// Need to wait some time as we need to handle all redistributions before we stop the servers
- Thread.sleep(5000);
+ Thread.sleep(1000);
for (int i = 0; i <= 2; i++)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -173,7 +173,8 @@
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
- locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+ LatchClusterTopologyListener topListener = new LatchClusterTopologyListener(countDownLatch);
+ locator.addClusterTopologyListener(topListener);
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
@@ -182,6 +183,7 @@
{
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
}
+ locator.removeClusterTopologyListener(topListener);
assertTrue(ok);
return sf;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-24 23:02:39 UTC (rev 11220)
@@ -20,10 +20,12 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -36,7 +38,19 @@
protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
private NodeManager nodeManager;
+
+ Logger log = Logger.getLogger(SingleLiveMultipleBackupsFailoverTest.class);
+ public void _testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 100; i++)
+ {
+ log.info("#test " + i);
+ testMultipleFailovers();
+ tearDown();
+ setUp();
+ }
+ }
public void testMultipleFailovers() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -56,7 +70,12 @@
servers.get(4).start();
servers.get(5).start();
- ServerLocator locator = getServerLocator(0);
+ ServerLocatorImpl locator = (ServerLocatorImpl)getServerLocator(0);
+
+ Topology topology = locator.getTopology();
+
+ // for logging and debugging
+ topology.setOwner("testMultipleFailovers");
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -65,36 +84,32 @@
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
int backupNode;
ClientSession session = sendAndConsume(sf, true);
- System.out.println("failing node 0");
- Thread.sleep(500);
+
+ log.info("failing node 0");
servers.get(0).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- Thread.sleep(500);
+ log.info("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- Thread.sleep(1000);
+ log.info("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- Thread.sleep(500);
+ log.info("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- Thread.sleep(500);
+ log.info("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
More information about the hornetq-commits
mailing list