[hornetq-commits] JBoss hornetq SVN: r12125 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 15 11:40:30 EST 2012


Author: borges
Date: 2012-02-15 11:40:29 -0500 (Wed, 15 Feb 2012)
New Revision: 12125

Modified:
   trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
tearDown: Terminate blocking sends earlier, unblock the channel.

Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2012-02-15 16:39:59 UTC (rev 12124)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java	2012-02-15 16:40:29 UTC (rev 12125)
@@ -40,6 +40,11 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
@@ -55,11 +60,13 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
@@ -80,6 +87,7 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueBinding;
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -131,7 +139,9 @@
    private final Collection<HornetQServer> servers = new ArrayList<HornetQServer>();
    private final Collection<ServerLocator> locators = new ArrayList<ServerLocator>();
    private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
-   private final Collection<ClientSession> clientSessions = new ArrayList<ClientSession>();
+   private final Collection<ClientSession> clientSessions = new HashSet<ClientSession>();
+   private final Collection<ClientConsumer> clientConsumers = new HashSet<ClientConsumer>();
+   private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    private boolean checkThread = true;
 
@@ -1503,6 +1513,15 @@
       return session;
    }
 
+   protected ClientConsumer addClientConsumer(ClientConsumer consumer)
+   {
+      synchronized (clientConsumers)
+      {
+         clientConsumers.add(consumer);
+      }
+      return consumer;
+   }
+
    protected void addSessionFactory(ClientSessionFactory sf)
    {
       synchronized (sessionFactories)
@@ -1511,19 +1530,86 @@
       }
    }
 
+   private class TerminateBlockedClientSession implements Runnable
+   {
+      private final Channel channel;
+      private final CountDownLatch latch;
+
+      public TerminateBlockedClientSession(Channel c, CountDownLatch latch)
+      {
+         this.channel = c;
+         this.latch = latch;
+      }
+      @Override
+      public void run()
+      {
+         try
+         {
+            if (latch.await(3, TimeUnit.SECONDS))
+               return;
+            channel.unlock();
+            channel.returnBlocking();
+         }
+         catch (InterruptedException e)
+         {
+            // interruption is ok, and gets ignored.
+         }
+      }
+   }
+
+   protected final void closeAllClientConsumers()
+   {
+      synchronized (clientConsumers)
+      {
+         for (ClientConsumer cc : clientConsumers)
+         {
+            if (cc == null || cc.isClosed())
+               continue;
+            try
+            {
+               if (cc instanceof ClientConsumerInternal)
+               {
+                  ClientConsumerInternal cci = (ClientConsumerInternal)cc;
+                  Channel channel = cci.getSession().getChannel();
+                  final CountDownLatch latch = new CountDownLatch(1);
+                  final Future<?> future = executorService.submit(new TerminateBlockedClientSession(channel, latch));
+                  cci.close();
+                  latch.countDown();
+                  future.cancel(false);
+               }
+               else
+               {
+                  cc.close();
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace(); // no-op
+            }
+         }
+         clientConsumers.clear();
+      }
+   }
+
    protected void closeAllClientSessions()
    {
       synchronized (clientSessions)
       {
-         for (ClientSession cs : clientSessions)
+         for (final ClientSession cs : clientSessions)
          {
-            if (cs == null)
+            if (cs == null || cs.isClosed())
                continue;
             try
             {
                if (cs instanceof ClientSessionInternal)
                {
-                  ((ClientSessionInternal)cs).cleanUp(false);
+                  ClientSessionInternal csi = ((ClientSessionInternal)cs);
+                  Channel channel = csi.getChannel();
+                  final CountDownLatch latch = new CountDownLatch(1);
+                  final Future<?> future = executorService.submit(new TerminateBlockedClientSession(channel, latch));
+                  csi.close();
+                  latch.countDown();
+                  future.cancel(false);
                }
                else
                {
@@ -1532,7 +1618,7 @@
             }
             catch (Exception e)
             {
-               // no-op
+               e.printStackTrace(); // no-op
             }
          }
          clientSessions.clear();

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-15 16:39:59 UTC (rev 12124)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-15 16:40:29 UTC (rev 12125)
@@ -162,7 +162,15 @@
    protected void tearDown() throws Exception
    {
       log.info("#test tearDown");
-      closeAllConsumers();
+      // closeAllConsumers();
+      for (ConsumerHolder ch : consumers)
+      {
+         addClientConsumer(ch.consumer);
+         addClientSession(ch.session);
+      }
+      closeAllClientConsumers();
+      closeAllClientSessions();
+
       closeAllSessionFactories();
       closeAllServerLocatorsFactories();
       for (int i = 0; i < MAX_SERVERS; i++)
@@ -534,7 +542,7 @@
             filterString = ClusterTestBase.FILTER_PROP.toString() + "='" + filterVal + "'";
          }
 
-         ClientConsumer consumer = session.createConsumer(queueName, filterString);
+         ClientConsumer consumer = addClientConsumer(session.createConsumer(queueName, filterString));
 
          session.start();
 



More information about the hornetq-commits mailing list