Author: borges
Date: 2012-02-16 06:39:06 -0500 (Thu, 16 Feb 2012)
New Revision: 12126
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Improve tearDown: close sessionFactories, assert client(Session|Producer)s are closed.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-15
16:40:29 UTC (rev 12125)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-16
11:39:06 UTC (rev 12126)
@@ -40,11 +40,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
@@ -66,9 +61,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -87,7 +80,6 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
-import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -99,6 +91,8 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.utils.UUIDGenerator;
@@ -141,7 +135,7 @@
private final Collection<ClientSessionFactory> sessionFactories = new
ArrayList<ClientSessionFactory>();
private final Collection<ClientSession> clientSessions = new
HashSet<ClientSession>();
private final Collection<ClientConsumer> clientConsumers = new
HashSet<ClientConsumer>();
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final Collection<HornetQComponent> otherComponents = new
HashSet<HornetQComponent>();
private boolean checkThread = true;
@@ -946,22 +940,42 @@
@Override
protected void tearDown() throws Exception
{
- closeAllClientSessions();
-
closeAllSessionFactories();
-
closeAllServerLocatorsFactories();
+ assertAllClientConsumersAreClosed();
+ assertAllClientSessionsAreClosed();
+
synchronized (servers)
{
for (HornetQServer server : servers)
{
+ if (server == null)
+ continue;
+ try
+ {
+ final ClusterManager clusterManager = server.getClusterManager();
+ if (clusterManager != null)
+ {
+ for (ClusterConnection cc : clusterManager.getClusterConnections())
+ {
+ stopComponent(cc);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
stopComponent(server);
}
servers.clear();
}
- List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables = new
ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
+ closeAllOtherComponents();
+
+ List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables =
+ new
ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
ArrayList<Exception> exceptions = new ArrayList<Exception>();
try
{
@@ -1068,6 +1082,21 @@
}
/**
+ *
+ */
+ private void closeAllOtherComponents()
+ {
+ synchronized (otherComponents)
+ {
+ for (HornetQComponent c : otherComponents)
+ {
+ stopComponent(c);
+ }
+ otherComponents.clear();
+ }
+ }
+
+ /**
* @param buffer
* @return
*/
@@ -1486,140 +1515,99 @@
return sf;
}
- protected HornetQServer addServer(HornetQServer server)
+ protected final HornetQServer addServer(HornetQServer server)
{
- synchronized (servers)
+ if (server != null)
{
- servers.add(server);
+ synchronized (servers)
+ {
+ servers.add(server);
+ }
}
return server;
}
protected final ServerLocator addServerLocator(ServerLocator locator)
{
- synchronized (locators)
+ if (locator != null)
{
- locators.add(locator);
+ synchronized (locators)
+ {
+ locators.add(locator);
+ }
}
return locator;
}
- protected ClientSession addClientSession(ClientSession session)
+ protected final ClientSession addClientSession(ClientSession session)
{
- synchronized (clientSessions)
+ if (session != null)
{
- clientSessions.add(session);
+ synchronized (clientSessions)
+ {
+ clientSessions.add(session);
+ }
}
return session;
}
- protected ClientConsumer addClientConsumer(ClientConsumer consumer)
+ protected final ClientConsumer addClientConsumer(ClientConsumer consumer)
{
- synchronized (clientConsumers)
+ if (consumer != null)
{
- clientConsumers.add(consumer);
+ synchronized (clientConsumers)
+ {
+ clientConsumers.add(consumer);
+ }
}
return consumer;
}
- protected void addSessionFactory(ClientSessionFactory sf)
+ protected final void addHornetQComponent(HornetQComponent component)
{
- synchronized (sessionFactories)
+ if (component != null)
{
- sessionFactories.add(sf);
+ synchronized (otherComponents)
+ {
+ otherComponents.add(component);
+ }
}
}
- private class TerminateBlockedClientSession implements Runnable
+ protected final void addSessionFactory(ClientSessionFactory sf)
{
- private final Channel channel;
- private final CountDownLatch latch;
-
- public TerminateBlockedClientSession(Channel c, CountDownLatch latch)
+ if (sf != null)
{
- this.channel = c;
- this.latch = latch;
- }
- @Override
- public void run()
- {
- try
+ synchronized (sessionFactories)
{
- if (latch.await(3, TimeUnit.SECONDS))
- return;
- channel.unlock();
- channel.returnBlocking();
+ sessionFactories.add(sf);
}
- catch (InterruptedException e)
- {
- // interruption is ok, and gets ignored.
- }
}
}
- protected final void closeAllClientConsumers()
+ private void assertAllClientConsumersAreClosed()
{
synchronized (clientConsumers)
{
for (ClientConsumer cc : clientConsumers)
{
- if (cc == null || cc.isClosed())
+ if (cc == null )
continue;
- try
- {
- if (cc instanceof ClientConsumerInternal)
- {
- ClientConsumerInternal cci = (ClientConsumerInternal)cc;
- Channel channel = cci.getSession().getChannel();
- final CountDownLatch latch = new CountDownLatch(1);
- final Future<?> future = executorService.submit(new
TerminateBlockedClientSession(channel, latch));
- cci.close();
- latch.countDown();
- future.cancel(false);
- }
- else
- {
- cc.close();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); // no-op
- }
+ assertTrue(cc.isClosed());
}
clientConsumers.clear();
}
}
- protected void closeAllClientSessions()
+ private void assertAllClientSessionsAreClosed()
{
synchronized (clientSessions)
{
for (final ClientSession cs : clientSessions)
{
- if (cs == null || cs.isClosed())
+ if (cs == null)
continue;
- try
- {
- if (cs instanceof ClientSessionInternal)
- {
- ClientSessionInternal csi = ((ClientSessionInternal)cs);
- Channel channel = csi.getChannel();
- final CountDownLatch latch = new CountDownLatch(1);
- final Future<?> future = executorService.submit(new
TerminateBlockedClientSession(channel, latch));
- csi.close();
- latch.countDown();
- future.cancel(false);
- }
- else
- {
- cs.close();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); // no-op
- }
+ assertTrue(cs.isClosed());
}
clientSessions.clear();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-15
16:40:29 UTC (rev 12125)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-16
11:39:06 UTC (rev 12126)
@@ -162,55 +162,23 @@
protected void tearDown() throws Exception
{
log.info("#test tearDown");
- // closeAllConsumers();
- for (ConsumerHolder ch : consumers)
- {
- addClientConsumer(ch.consumer);
- addClientSession(ch.session);
- }
- closeAllClientConsumers();
- closeAllClientSessions();
- closeAllSessionFactories();
- closeAllServerLocatorsFactories();
for (int i = 0; i < MAX_SERVERS; i++)
{
- if (servers[i] == null)
- continue;
- try
- {
- final ClusterManager clusterManager = servers[i].getClusterManager();
- if (clusterManager != null)
- {
- for (ClusterConnection cc : clusterManager.getClusterConnections())
- {
- cc.stop();
- }
- }
- }
- catch (Exception e)
- {
- // no-op
- }
- stopServers(i);
+ addHornetQComponent(nodeManagers[i]);
}
- for (int i = 0; i < MAX_SERVERS; i++)
- {
- stopComponent(nodeManagers[i]);
- }
- UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
-
servers = null;
sfs = null;
- consumers = null;
-
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
nodeManagers = null;
super.tearDown();
+
+ UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
+
}
// Private
-------------------------------------------------------------------------------------------------------