[hornetq-commits] JBoss hornetq SVN: r11529 - in trunk/hornetq-core/src: test/java/org/hornetq/tests/util and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 13 05:54:50 EDT 2011


Author: ataylor
Date: 2011-10-13 05:54:50 -0400 (Thu, 13 Oct 2011)
New Revision: 11529

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
Log:
added check for hanging retries after shutdown

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-10-13 07:06:03 UTC (rev 11528)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-10-13 09:54:50 UTC (rev 11529)
@@ -16,15 +16,8 @@
 import java.lang.ref.WeakReference;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.Lock;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -156,6 +149,8 @@
 
    private final Object waitLock = new Object();
 
+   public final static List<CloseRunnable> CLOSE_RUNNABLES = new ArrayList<CloseRunnable>();
+
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -1453,17 +1448,7 @@
                serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
             }
 
-            closeExecutor.execute(new Runnable()
-            {
-               // Must be executed on new thread since cannot block the netty thread for a long time and fail can
-               // cause reconnect loop
-               public void run()
-               {
-                  conn.fail(new HornetQException(HornetQException.DISCONNECTED,
-                                                 "The connection was disconnected because of server shutdown"));
-
-               }
-            });
+            closeExecutor.execute(new CloseRunnable(conn));
          }
          else if (type == PacketImpl.CLUSTER_TOPOLOGY)
          {
@@ -1520,8 +1505,42 @@
             }
          }
       }
+
+
    }
 
+   public class CloseRunnable implements Runnable
+   {
+      private CoreRemotingConnection conn;
+
+      public CloseRunnable(CoreRemotingConnection conn)
+      {
+         this.conn = conn;
+      }
+
+      // Must be executed on new thread since cannot block the netty thread for a long time and fail can
+      // cause reconnect loop
+      public void run()
+      {
+         CLOSE_RUNNABLES.add(this);
+         try
+         {
+            conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+                  "The connection was disconnected because of server shutdown"));
+         } finally
+         {
+            CLOSE_RUNNABLES.remove(this);
+         }
+
+      }
+
+      public ClientSessionFactoryImpl stop()
+      {
+         causeExit();
+         return ClientSessionFactoryImpl.this;
+      }
+
+   }
    private class DelegatingBufferHandler implements BufferHandler
    {
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)

Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2011-10-13 07:06:03 UTC (rev 11528)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2011-10-13 09:54:50 UTC (rev 11529)
@@ -57,6 +57,7 @@
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -913,8 +914,25 @@
    @Override
    protected void tearDown() throws Exception
    {
+      List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables = new ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
+      ArrayList<Exception> exceptions = new ArrayList<Exception>();
+      if(!closeRunnables.isEmpty())
+      {
+         for (ClientSessionFactoryImpl.CloseRunnable closeRunnable : closeRunnables)
+         {
+            exceptions.add(closeRunnable.stop().e);
+         }
+      }
       cleanupPools();
-
+      //clean up pools before failing
+      if(!exceptions.isEmpty())
+      {
+         for (Exception exception : exceptions)
+         {
+            exception.printStackTrace();
+         }
+         fail("Client Session Factories still tryint to reconnect, see above to see where created");
+      }
       Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
       for (Thread thread : threadMap.keySet())
       {
@@ -925,12 +943,12 @@
             {
                alreadyFailedThread.add(thread);
                System.out.println(threadDump(this.getName() + " has left threads running. Look at thread " +
-                                             thread.getName() +
-                                             " id = " +
-                                             thread.getId() +
-                                             " has running locators on test " +
-                                             this.getName() +
-                                             " on this following dump"));
+                     thread.getName() +
+                     " id = " +
+                     thread.getId() +
+                     " has running locators on test " +
+                     this.getName() +
+                     " on this following dump"));
                fail("test left serverlocator running, this could effect other tests");
             }
             else if (stackTraceElement.getMethodName().contains("BroadcastGroupImpl.run") && !alreadyFailedThread.contains(thread))



More information about the hornetq-commits mailing list