Author: borges
Date: 2012-02-15 11:40:29 -0500 (Wed, 15 Feb 2012)
New Revision: 12125
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
tearDown: Terminate blocking sends earlier, unblock the channel.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-15
16:39:59 UTC (rev 12124)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-15
16:40:29 UTC (rev 12125)
@@ -40,6 +40,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
@@ -55,11 +60,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
@@ -80,6 +87,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -131,7 +139,9 @@
private final Collection<HornetQServer> servers = new
ArrayList<HornetQServer>();
private final Collection<ServerLocator> locators = new
ArrayList<ServerLocator>();
private final Collection<ClientSessionFactory> sessionFactories = new
ArrayList<ClientSessionFactory>();
- private final Collection<ClientSession> clientSessions = new
ArrayList<ClientSession>();
+ private final Collection<ClientSession> clientSessions = new
HashSet<ClientSession>();
+ private final Collection<ClientConsumer> clientConsumers = new
HashSet<ClientConsumer>();
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private boolean checkThread = true;
@@ -1503,6 +1513,15 @@
return session;
}
+ protected ClientConsumer addClientConsumer(ClientConsumer consumer)
+ {
+ synchronized (clientConsumers)
+ {
+ clientConsumers.add(consumer);
+ }
+ return consumer;
+ }
+
protected void addSessionFactory(ClientSessionFactory sf)
{
synchronized (sessionFactories)
@@ -1511,19 +1530,86 @@
}
}
+ private class TerminateBlockedClientSession implements Runnable
+ {
+ private final Channel channel;
+ private final CountDownLatch latch;
+
+ public TerminateBlockedClientSession(Channel c, CountDownLatch latch)
+ {
+ this.channel = c;
+ this.latch = latch;
+ }
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (latch.await(3, TimeUnit.SECONDS))
+ return;
+ channel.unlock();
+ channel.returnBlocking();
+ }
+ catch (InterruptedException e)
+ {
+ // interruption is ok, and gets ignored.
+ }
+ }
+ }
+
+ protected final void closeAllClientConsumers()
+ {
+ synchronized (clientConsumers)
+ {
+ for (ClientConsumer cc : clientConsumers)
+ {
+ if (cc == null || cc.isClosed())
+ continue;
+ try
+ {
+ if (cc instanceof ClientConsumerInternal)
+ {
+ ClientConsumerInternal cci = (ClientConsumerInternal)cc;
+ Channel channel = cci.getSession().getChannel();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Future<?> future = executorService.submit(new
TerminateBlockedClientSession(channel, latch));
+ cci.close();
+ latch.countDown();
+ future.cancel(false);
+ }
+ else
+ {
+ cc.close();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // no-op
+ }
+ }
+ clientConsumers.clear();
+ }
+ }
+
protected void closeAllClientSessions()
{
synchronized (clientSessions)
{
- for (ClientSession cs : clientSessions)
+ for (final ClientSession cs : clientSessions)
{
- if (cs == null)
+ if (cs == null || cs.isClosed())
continue;
try
{
if (cs instanceof ClientSessionInternal)
{
- ((ClientSessionInternal)cs).cleanUp(false);
+ ClientSessionInternal csi = ((ClientSessionInternal)cs);
+ Channel channel = csi.getChannel();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Future<?> future = executorService.submit(new
TerminateBlockedClientSession(channel, latch));
+ csi.close();
+ latch.countDown();
+ future.cancel(false);
}
else
{
@@ -1532,7 +1618,7 @@
}
catch (Exception e)
{
- // no-op
+ e.printStackTrace(); // no-op
}
}
clientSessions.clear();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-15
16:39:59 UTC (rev 12124)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-15
16:40:29 UTC (rev 12125)
@@ -162,7 +162,15 @@
protected void tearDown() throws Exception
{
log.info("#test tearDown");
- closeAllConsumers();
+ // closeAllConsumers();
+ for (ConsumerHolder ch : consumers)
+ {
+ addClientConsumer(ch.consumer);
+ addClientSession(ch.session);
+ }
+ closeAllClientConsumers();
+ closeAllClientSessions();
+
closeAllSessionFactories();
closeAllServerLocatorsFactories();
for (int i = 0; i < MAX_SERVERS; i++)
@@ -534,7 +542,7 @@
filterString = ClusterTestBase.FILTER_PROP.toString() + "='" +
filterVal + "'";
}
- ClientConsumer consumer = session.createConsumer(queueName, filterString);
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(queueName,
filterString));
session.start();