[hornetq-commits] JBoss hornetq SVN: r12126 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Feb 16 06:39:07 EST 2012


Author: borges
Date: 2012-02-16 06:39:06 -0500 (Thu, 16 Feb 2012)
New Revision: 12126

Modified:
   trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Improve tearDown: close sessionFactories, assert client(Session|Producer)s are closed.

Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2012-02-15 16:40:29 UTC (rev 12125)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2012-02-16 11:39:06 UTC (rev 12126)
@@ -40,11 +40,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
@@ -66,9 +61,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -87,7 +80,6 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueBinding;
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
-import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -99,6 +91,8 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.utils.UUIDGenerator;
@@ -141,7 +135,7 @@
    private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
    private final Collection<ClientSession> clientSessions = new HashSet<ClientSession>();
    private final Collection<ClientConsumer> clientConsumers = new HashSet<ClientConsumer>();
-   private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+   private final Collection<HornetQComponent> otherComponents = new HashSet<HornetQComponent>();
 
    private boolean checkThread = true;
 
@@ -946,22 +940,42 @@
    @Override
    protected void tearDown() throws Exception
    {
-      closeAllClientSessions();
-
       closeAllSessionFactories();
-
       closeAllServerLocatorsFactories();
 
+      assertAllClientConsumersAreClosed();
+      assertAllClientSessionsAreClosed();
+
       synchronized (servers)
       {
          for (HornetQServer server : servers)
          {
+            if (server == null)
+               continue;
+            try
+            {
+               final ClusterManager clusterManager = server.getClusterManager();
+               if (clusterManager != null)
+               {
+                  for (ClusterConnection cc : clusterManager.getClusterConnections())
+                  {
+                     stopComponent(cc);
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               // no-op
+            }
             stopComponent(server);
          }
          servers.clear();
       }
 
-      List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables = new ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
+      closeAllOtherComponents();
+
+      List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables =
+               new ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
       ArrayList<Exception> exceptions = new ArrayList<Exception>();
       try
       {
@@ -1068,6 +1082,21 @@
    }
 
    /**
+    *
+    */
+   private void closeAllOtherComponents()
+   {
+      synchronized (otherComponents)
+      {
+         for (HornetQComponent c : otherComponents)
+         {
+            stopComponent(c);
+         }
+         otherComponents.clear();
+      }
+   }
+
+   /**
     * @param buffer
     * @return
     */
@@ -1486,140 +1515,99 @@
       return sf;
    }
 
-   protected HornetQServer addServer(HornetQServer server)
+   protected final HornetQServer addServer(HornetQServer server)
    {
-      synchronized (servers)
+      if (server != null)
       {
-         servers.add(server);
+         synchronized (servers)
+         {
+            servers.add(server);
+         }
       }
       return server;
    }
 
    protected final ServerLocator addServerLocator(ServerLocator locator)
    {
-      synchronized (locators)
+      if (locator != null)
       {
-         locators.add(locator);
+         synchronized (locators)
+         {
+            locators.add(locator);
+         }
       }
       return locator;
    }
 
-   protected ClientSession addClientSession(ClientSession session)
+   protected final ClientSession addClientSession(ClientSession session)
    {
-      synchronized (clientSessions)
+      if (session != null)
       {
-         clientSessions.add(session);
+         synchronized (clientSessions)
+         {
+            clientSessions.add(session);
+         }
       }
       return session;
    }
 
-   protected ClientConsumer addClientConsumer(ClientConsumer consumer)
+   protected final ClientConsumer addClientConsumer(ClientConsumer consumer)
    {
-      synchronized (clientConsumers)
+      if (consumer != null)
       {
-         clientConsumers.add(consumer);
+         synchronized (clientConsumers)
+         {
+            clientConsumers.add(consumer);
+         }
       }
       return consumer;
    }
 
-   protected void addSessionFactory(ClientSessionFactory sf)
+   protected final void addHornetQComponent(HornetQComponent component)
    {
-      synchronized (sessionFactories)
+      if (component != null)
       {
-         sessionFactories.add(sf);
+         synchronized (otherComponents)
+         {
+            otherComponents.add(component);
+         }
       }
    }
 
-   private class TerminateBlockedClientSession implements Runnable
+   protected final void addSessionFactory(ClientSessionFactory sf)
    {
-      private final Channel channel;
-      private final CountDownLatch latch;
-
-      public TerminateBlockedClientSession(Channel c, CountDownLatch latch)
+      if (sf != null)
       {
-         this.channel = c;
-         this.latch = latch;
-      }
-      @Override
-      public void run()
-      {
-         try
+         synchronized (sessionFactories)
          {
-            if (latch.await(3, TimeUnit.SECONDS))
-               return;
-            channel.unlock();
-            channel.returnBlocking();
+            sessionFactories.add(sf);
          }
-         catch (InterruptedException e)
-         {
-            // interruption is ok, and gets ignored.
-         }
       }
    }
 
-   protected final void closeAllClientConsumers()
+   private void assertAllClientConsumersAreClosed()
    {
       synchronized (clientConsumers)
       {
          for (ClientConsumer cc : clientConsumers)
          {
-            if (cc == null || cc.isClosed())
+            if (cc == null )
                continue;
-            try
-            {
-               if (cc instanceof ClientConsumerInternal)
-               {
-                  ClientConsumerInternal cci = (ClientConsumerInternal)cc;
-                  Channel channel = cci.getSession().getChannel();
-                  final CountDownLatch latch = new CountDownLatch(1);
-                  final Future<?> future = executorService.submit(new TerminateBlockedClientSession(channel, latch));
-                  cci.close();
-                  latch.countDown();
-                  future.cancel(false);
-               }
-               else
-               {
-                  cc.close();
-               }
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace(); // no-op
-            }
+            assertTrue(cc.isClosed());
          }
          clientConsumers.clear();
       }
    }
 
-   protected void closeAllClientSessions()
+   private void assertAllClientSessionsAreClosed()
    {
       synchronized (clientSessions)
       {
          for (final ClientSession cs : clientSessions)
          {
-            if (cs == null || cs.isClosed())
+            if (cs == null)
                continue;
-            try
-            {
-               if (cs instanceof ClientSessionInternal)
-               {
-                  ClientSessionInternal csi = ((ClientSessionInternal)cs);
-                  Channel channel = csi.getChannel();
-                  final CountDownLatch latch = new CountDownLatch(1);
-                  final Future<?> future = executorService.submit(new TerminateBlockedClientSession(channel, latch));
-                  csi.close();
-                  latch.countDown();
-                  future.cancel(false);
-               }
-               else
-               {
-                  cs.close();
-               }
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace(); // no-op
-            }
+            assertTrue(cs.isClosed());
          }
          clientSessions.clear();
       }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-15 16:40:29 UTC (rev 12125)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-16 11:39:06 UTC (rev 12126)
@@ -162,55 +162,23 @@
    protected void tearDown() throws Exception
    {
       log.info("#test tearDown");
-      // closeAllConsumers();
-      for (ConsumerHolder ch : consumers)
-      {
-         addClientConsumer(ch.consumer);
-         addClientSession(ch.session);
-      }
-      closeAllClientConsumers();
-      closeAllClientSessions();
 
-      closeAllSessionFactories();
-      closeAllServerLocatorsFactories();
       for (int i = 0; i < MAX_SERVERS; i++)
       {
-         if (servers[i] == null)
-            continue;
-         try
-         {
-            final ClusterManager clusterManager = servers[i].getClusterManager();
-            if (clusterManager != null)
-            {
-               for (ClusterConnection cc : clusterManager.getClusterConnections())
-               {
-                  cc.stop();
-               }
-            }
-         }
-         catch (Exception e)
-         {
-            // no-op
-         }
-         stopServers(i);
+         addHornetQComponent(nodeManagers[i]);
       }
-      for (int i = 0; i < MAX_SERVERS; i++)
-      {
-         stopComponent(nodeManagers[i]);
-      }
-      UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
-
       servers = null;
 
       sfs = null;
 
-      consumers = null;
-
       consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
 
       nodeManagers = null;
 
       super.tearDown();
+
+      UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
+
    }
 
    // Private -------------------------------------------------------------------------------------------------------



More information about the hornetq-commits mailing list