Author: ataylor
Date: 2011-10-13 05:54:50 -0400 (Thu, 13 Oct 2011)
New Revision: 11529
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
Log:
added check for hanging retries after shutdown
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-13
07:06:03 UTC (rev 11528)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-13
09:54:50 UTC (rev 11529)
@@ -16,15 +16,8 @@
import java.lang.ref.WeakReference;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import org.hornetq.api.core.HornetQBuffer;
@@ -156,6 +149,8 @@
private final Object waitLock = new Object();
+ public final static List<CloseRunnable> CLOSE_RUNNABLES = new
ArrayList<CloseRunnable>();
+
// Static
//
---------------------------------------------------------------------------------------
@@ -1453,17 +1448,7 @@
serverLocator.notifyNodeDown(System.currentTimeMillis(),
msg.getNodeID().toString());
}
- closeExecutor.execute(new Runnable()
- {
- // Must be executed on new thread since cannot block the netty thread for
a long time and fail can
- // cause reconnect loop
- public void run()
- {
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was disconnected
because of server shutdown"));
-
- }
- });
+ closeExecutor.execute(new CloseRunnable(conn));
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY)
{
@@ -1520,8 +1505,42 @@
}
}
}
+
+
}
+ public class CloseRunnable implements Runnable
+ {
+ private CoreRemotingConnection conn;
+
+ public CloseRunnable(CoreRemotingConnection conn)
+ {
+ this.conn = conn;
+ }
+
+ // Must be executed on new thread since cannot block the netty thread for a long
time and fail can
+ // cause reconnect loop
+ public void run()
+ {
+ CLOSE_RUNNABLES.add(this);
+ try
+ {
+ conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+ "The connection was disconnected because of server
shutdown"));
+ } finally
+ {
+ CLOSE_RUNNABLES.remove(this);
+ }
+
+ }
+
+ public ClientSessionFactoryImpl stop()
+ {
+ causeExit();
+ return ClientSessionFactoryImpl.this;
+ }
+
+ }
private class DelegatingBufferHandler implements BufferHandler
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-13
07:06:03 UTC (rev 11528)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-13
09:54:50 UTC (rev 11529)
@@ -57,6 +57,7 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -913,8 +914,25 @@
@Override
protected void tearDown() throws Exception
{
+ List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables = new
ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
+ ArrayList<Exception> exceptions = new ArrayList<Exception>();
+ if(!closeRunnables.isEmpty())
+ {
+ for (ClientSessionFactoryImpl.CloseRunnable closeRunnable : closeRunnables)
+ {
+ exceptions.add(closeRunnable.stop().e);
+ }
+ }
cleanupPools();
-
+ //clean up pools before failing
+ if(!exceptions.isEmpty())
+ {
+ for (Exception exception : exceptions)
+ {
+ exception.printStackTrace();
+ }
+ fail("Client Session Factories still tryint to reconnect, see above to see
where created");
+ }
Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
for (Thread thread : threadMap.keySet())
{
@@ -925,12 +943,12 @@
{
alreadyFailedThread.add(thread);
System.out.println(threadDump(this.getName() + " has left threads
running. Look at thread " +
- thread.getName() +
- " id = " +
- thread.getId() +
- " has running locators on test "
+
- this.getName() +
- " on this following dump"));
+ thread.getName() +
+ " id = " +
+ thread.getId() +
+ " has running locators on test " +
+ this.getName() +
+ " on this following dump"));
fail("test left serverlocator running, this could effect other
tests");
}
else if
(stackTraceElement.getMethodName().contains("BroadcastGroupImpl.run") &&
!alreadyFailedThread.contains(thread))