[hornetq-commits] JBoss hornetq SVN: r11219 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core/impl and 10 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 23 22:16:48 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-23 22:16:47 -0400 (Tue, 23 Aug 2011)
New Revision: 11219
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ConnectionEntry.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Improvements on test suite (fixing a few intermittent issues)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -34,7 +34,7 @@
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
+ private static final int BACKOF_TIMEOUT = 50;
private static final long serialVersionUID = -9037171688692471371L;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -37,7 +37,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -53,9 +52,9 @@
public class CoreProtocolManager implements ProtocolManager
{
private static final Logger log = Logger.getLogger(CoreProtocolManager.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private final HornetQServer server;
private final List<Interceptor> interceptors;
@@ -70,13 +69,14 @@
public ConnectionEntry createConnectionEntry(final Connection connection)
{
final Configuration config = server.getConfiguration();
+
+ Executor connectionExecutor = server.getExecutorFactory().getExecutor();
final CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
interceptors,
- config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
+ config.isAsyncConnectionExecutionEnabled() ? connectionExecutor
: null,
- server.getNodeID());
+ server.getNodeID());
Channel channel1 = rc.getChannel(1, -1);
@@ -91,7 +91,7 @@
ttl = config.getConnectionTTLOverride();
}
- final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
+ final ConnectionEntry entry = new ConnectionEntry(rc, connectionExecutor, System.currentTimeMillis(), ttl);
final Channel channel0 = rc.getChannel(0, -1);
@@ -115,11 +115,9 @@
else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
{
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
-
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- Executor executor = server.getExecutorFactory().getExecutor();
-
public void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
@@ -127,21 +125,21 @@
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
// What could cause deadlocks
- executor.execute(new Runnable()
+ entry.connectionExecutor.execute(new Runnable()
{
public void run()
{
channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
});
- }
+ }
public void nodeDown(final String nodeID)
{
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
// What could cause deadlocks
- executor.execute(new Runnable()
+ entry.connectionExecutor.execute(new Runnable()
{
public void run()
{
@@ -149,17 +147,17 @@
}
});
}
-
+
public String toString()
{
return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
}
};
-
+
final boolean isCC = msg.isClusterConnection();
-
+
server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
+
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -120,13 +120,13 @@
if (ttl != -1)
{
- return new ConnectionEntry(conn, System.currentTimeMillis(), ttl);
+ return new ConnectionEntry(conn, null, System.currentTimeMillis(), ttl);
}
else
{
// Default to 1 minute - which is same as core protocol
- return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
+ return new ConnectionEntry(conn, null, System.currentTimeMillis(), 1 * 60 * 1000);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -181,6 +181,8 @@
void destroyBridge(String name) throws Exception;
ServerSession getSessionByID(String sessionID);
+
+ void threadDump(String reason);
void stop(boolean failoverOnServerShutdown) throws Exception;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -53,6 +53,8 @@
void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
Topology getTopology();
+
+ void flushExecutor();
void announceBackup() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -56,6 +56,7 @@
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.UUID;
/**
@@ -123,10 +124,10 @@
this.executorFactory = executorFactory;
- executor = executorFactory.getExecutor();
-
- topology.setExecutor(executorFactory.getExecutor());
+ executor = executorFactory.getExecutor();;
+ topology.setExecutor(executor);
+
this.server = server;
this.postOffice = postOffice;
@@ -340,6 +341,16 @@
}
}
+ public void flushExecutor()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ if (!future.await(10000))
+ {
+ server.threadDump("Couldn't flush ClusterManager executor (" + this + ") in 10 seconds, verify your thread pool size");
+ }
+ }
+
public boolean isStarted()
{
return started;
@@ -486,7 +497,7 @@
{
this.clusterLocators.add(serverLocator);
}
-
+
public void removeClusterLocator(final ServerLocatorInternal serverLocator)
{
this.clusterLocators.remove(serverLocator);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -625,7 +625,7 @@
activation = new SharedNothingBackupActivation();
}
- backupActivationThread = new Thread(activation);
+ backupActivationThread = new Thread(activation, "Activation for server " + this);
backupActivationThread.start();
}
@@ -652,7 +652,41 @@
stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
+
+ public void threadDump(final String reason)
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+ Map<Thread, StackTraceElement[]> stackTrace = Thread.getAllStackTraces();
+
+ out.println("Generating thread dump because - " + reason);
+ out.println("*******************************************************************************");
+
+ for (Map.Entry<Thread, StackTraceElement[]> el : stackTrace.entrySet())
+ {
+ out.println("===============================================================================");
+ out.println("Thread " + el.getKey() +
+ " name = " +
+ el.getKey().getName() +
+ " id = " +
+ el.getKey().getId() +
+ " group = " +
+ el.getKey().getThreadGroup());
+ out.println();
+ for (StackTraceElement traceEl : el.getValue())
+ {
+ out.println(traceEl);
+ }
+ }
+
+ out.println("===============================================================================");
+ out.println("End Thread dump");
+ out.println("*******************************************************************************");
+
+ log.warn(str.toString());
+ }
+
public void stop(boolean failoverOnServerShutdown) throws Exception
{
synchronized (this)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ConnectionEntry.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ConnectionEntry.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ConnectionEntry.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -13,7 +13,9 @@
package org.hornetq.spi.core.protocol;
+import java.util.concurrent.Executor;
+
/**
* A ConnectionEntry
*
@@ -28,13 +30,17 @@
public volatile long lastCheck;
public volatile long ttl;
+
+ public final Executor connectionExecutor;
- public ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+ public ConnectionEntry(final RemotingConnection connection, final Executor connectionExecutor, final long lastCheck, final long ttl)
{
this.connection = connection;
this.lastCheck = lastCheck;
this.ttl = ttl;
+
+ this.connectionExecutor = connectionExecutor;
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -2041,23 +2041,6 @@
}
- protected void waitForServer(HornetQServer server) throws InterruptedException
- {
- long timetowait = System.currentTimeMillis() + 5000;
- while (!server.isStarted())
- {
- Thread.sleep(100);
- if (server.isStarted())
- {
- break;
- }
- else if (System.currentTimeMillis() > timetowait)
- {
- fail("server didnt start");
- }
- }
- }
-
protected void stopClusterConnections(final int... nodes) throws Exception
{
for (int node : nodes)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -97,7 +97,7 @@
{
e.printStackTrace();
log.error(e.getMessage(), e);
- throw e;
+ throw e;
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -70,7 +70,10 @@
protected abstract boolean isNetty();
- protected int waitForNewLive(long seconds, boolean waitForNewBackup, Map<Integer, TestableServer> servers, int... nodes)
+ protected int waitForNewLive(long seconds,
+ boolean waitForNewBackup,
+ Map<Integer, TestableServer> servers,
+ int... nodes)
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
@@ -84,12 +87,12 @@
{
newLive = node;
}
- else if(newLive != -1)
+ else if (newLive != -1)
{
- if(waitForNewBackup)
+ if (waitForNewBackup)
+ {
+ if (node != newLive && servers.get(node).isStarted())
{
- if(node != newLive && servers.get(node).isStarted())
- {
return newLive;
}
}
@@ -99,6 +102,7 @@
}
}
}
+
try
{
Thread.sleep(100);
@@ -146,7 +150,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message2 = consumer.receive(10000);
-
+
assertNotNull(message2);
Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
@@ -174,10 +178,10 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
- if (!ok)
+ if (!ok)
{
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
- }
+ }
assertTrue(ok);
return sf;
}
@@ -191,7 +195,7 @@
}
return new ServerLocatorImpl(true, configs);
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -213,9 +217,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -46,8 +46,11 @@
createBackupConfig(0, 3, 0, 1, 2, 4, 5);
createBackupConfig(0, 4, 0, 1, 2, 3, 5);
createBackupConfig(0, 5, 0, 1, 2, 3, 4);
+
servers.get(0).start();
+ waitForServer(servers.get(0).getServer());
servers.get(1).start();
+ waitForServer(servers.get(1).getServer());
servers.get(2).start();
servers.get(3).start();
servers.get(4).start();
@@ -63,30 +66,35 @@
int backupNode;
ClientSession session = sendAndConsume(sf, true);
System.out.println("failing node 0");
+ Thread.sleep(500);
servers.get(0).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
+ Thread.sleep(500);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
+ Thread.sleep(1000);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
+ Thread.sleep(500);
servers.get(backupNode).crash(session);
session.close();
backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
+ Thread.sleep(500);
servers.get(backupNode).crash(session);
session.close();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -112,6 +112,7 @@
}
ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
+ clusterManager.flushExecutor();
clusterManager.clear();
server.stop(true);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -165,7 +165,37 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+ protected void waitForServer(HornetQServer server) throws InterruptedException
+ {
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted() && System.currentTimeMillis() < timetowait)
+ {
+ Thread.sleep(100);
+ }
+
+ if (!server.isStarted())
+ {
+ log.info(threadDump("Server didn't start"));
+ fail("server didnt start");
+ }
+
+ if (!server.getConfiguration().isBackup())
+ {
+ timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isInitialised() && System.currentTimeMillis() < timetowait)
+ {
+ Thread.sleep(100);
+ }
+
+ if (!server.isInitialised())
+ {
+ fail("Server didn't initialize");
+ }
+ }
+ }
+
protected HornetQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-23 13:34:40 UTC (rev 11218)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-24 02:16:47 UTC (rev 11219)
@@ -99,6 +99,8 @@
private static final Logger log = Logger.getLogger(UnitTestCase.class);
+ private static final Logger logInstance = Logger.getLogger(UnitTestCase.class);
+
public static final String INVM_ACCEPTOR_FACTORY = "org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory";
public static final String INVM_CONNECTOR_FACTORY = "org.hornetq.core.remoting.impl.invm.InVMConnectorFactory";
@@ -268,7 +270,7 @@
public static void forceGC()
{
- log.info("#test forceGC");
+ logInstance.info("#test forceGC");
WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
// A loop that will wait GC, using the minimal time as possible
while (dumbReference.get() != null)
@@ -282,7 +284,7 @@
{
}
}
- log.info("#test forceGC Done");
+ logInstance.info("#test forceGC Done");
}
public static void forceGC(Reference<?> ref, long timeout)
More information about the hornetq-commits
mailing list