[hornetq-commits] JBoss hornetq SVN: r12186 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration: management and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Feb 23 09:58:14 EST 2012
Author: borges
Date: 2012-02-23 09:58:13 -0500 (Thu, 23 Feb 2012)
New Revision: 12186
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java
Log:
improve tearDown() to avoid leaking threads.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2012-02-23 14:57:50 UTC (rev 12185)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2012-02-23 14:58:13 UTC (rev 12186)
@@ -39,14 +39,12 @@
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -63,7 +61,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author Clebert Suconic
- *
+ *
* Created 14 Jan 2009 14:05:01
*
*
@@ -80,11 +78,11 @@
{
if (isNetty())
{
- return NettyConnectorFactory.class.getName();
+ return NETTY_CONNECTOR_FACTORY;
}
else
{
- return "org.hornetq.core.remoting.impl.invm.InVMConnectorFactory";
+ return INVM_CONNECTOR_FACTORY;
}
}
@@ -179,10 +177,10 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
ClientSession session0 = sf0.createSession(false, true, true);
@@ -263,8 +261,8 @@
{
}
}
-
-
+
+
assertEquals(0, loadQueues(server0).size());
}
@@ -274,12 +272,12 @@
{
internalTestMessageLoss(false);
}
-
+
public void testLostMessageLargeMessage() throws Exception
{
internalTestMessageLoss(true);
}
-
+
/** This test will ignore messages
What will cause the bridge to fail with a timeout
The bridge should still recover the failure and reconnect on that case */
@@ -289,7 +287,7 @@
{
public boolean ignoreSends = true;
public CountDownLatch latch;
-
+
MyInterceptor(int numberOfIgnores)
{
latch = new CountDownLatch(numberOfIgnores);
@@ -312,11 +310,11 @@
return true;
}
}
-
+
}
-
+
MyInterceptor myInterceptor = new MyInterceptor(3);
-
+
HornetQServer server0 = null;
HornetQServer server1 = null;
ServerLocator locator = null;
@@ -368,7 +366,7 @@
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
-
+
bridgeConfiguration.setCallTimeout(500);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -386,11 +384,11 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
-
+
server1.getRemotingService().addInterceptor(myInterceptor);
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -424,7 +422,7 @@
producer0.send(message);
}
-
+
myInterceptor.latch.await();
myInterceptor.ignoreSends = false;
@@ -477,10 +475,8 @@
{
}
}
-
-
- assertEquals(0, loadQueues(server0).size());
+ assertEquals("there should be no queues", 0, loadQueues(server0).size());
}
/**
@@ -601,7 +597,7 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -659,7 +655,7 @@
ClientMessage message = consumer1.receive(2000);
Assert.assertNotNull(message);
-
+
Assert.assertEquals("goat", message.getStringProperty(selectorKey));
Assert.assertEquals(i, message.getObjectProperty(propKey));
@@ -671,9 +667,9 @@
readMessages(message);
}
}
-
+
session0.commit();
-
+
session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
@@ -710,13 +706,13 @@
}
}
-
+
if (useFiles)
{
Map<Long, AtomicInteger> counters = loadQueues(server0);
assertEquals(1, counters.size());
Long key = counters.keySet().iterator().next();
-
+
AtomicInteger value = counters.get(key);
assertNotNull(value);
assertEquals(numMessages, counters.get(key).intValue());
@@ -785,7 +781,7 @@
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSession session0 = sf0.createSession(false, true, true);
@@ -881,7 +877,7 @@
}
}
-
+
assertEquals(0, loadQueues(server0).size());
@@ -946,7 +942,7 @@
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSession session0 = sf0.createSession(false, true, true);
@@ -1070,7 +1066,7 @@
}
}
-
+
assertEquals(0, loadQueues(server0).size());
@@ -1149,7 +1145,7 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -1232,7 +1228,7 @@
}
}
-
+
assertEquals(0, loadQueues(server0).size());
@@ -1318,7 +1314,7 @@
{
try
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server1tc);
+ ServerLocator locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server1tc));
ClientSessionFactory sf = locator.createSessionFactory();
@@ -1367,8 +1363,8 @@
@Override
public void run()
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc);
-
+ ServerLocator locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc));
+
locator.setBlockOnDurableSend(false);
locator.setBlockOnNonDurableSend(false);
@@ -1392,8 +1388,8 @@
ClientMessage message = session.createMessage(true);
message.putIntProperty("seq", i);
-
-
+
+
if (i % 100 == 0)
{
message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9));
@@ -1433,7 +1429,7 @@
for (int repeat = 0 ; repeat < totalrepeats; repeat++)
{
ArrayList<Thread> threads = new ArrayList<Thread>();
-
+
threads.add(new ConsumerThread());
threads.add(new ProducerThread(numMessages / 2));
threads.add(new ProducerThread(numMessages / 2));
@@ -1442,12 +1438,12 @@
{
t.start();
}
-
+
for (Thread t : threads)
{
t.join();
}
-
+
assertEquals(0, errors.get());
}
}
@@ -1471,7 +1467,7 @@
}
}
-
+
assertEquals(0, loadQueues(server0).size());
@@ -1548,7 +1544,7 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -1620,8 +1616,8 @@
{
}
}
-
-
+
+
assertEquals(0, loadQueues(server0).size());
@@ -1700,7 +1696,7 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -1729,7 +1725,7 @@
producer0.send(message);
}
-
+
session0.commit();
for (int i = 0; i < numMessages; i++)
@@ -1739,9 +1735,9 @@
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
-
+
HornetQBuffer buff = message.getBodyBuffer();
-
+
for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
{
assertEquals(getSamplebyte(posMsg), buff.readByte());
@@ -1749,7 +1745,7 @@
message.acknowledge();
}
-
+
session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
@@ -1761,8 +1757,8 @@
sf0.close();
sf1.close();
-
-
+
+
}
finally
{
@@ -1786,7 +1782,7 @@
{
}
}
-
+
assertEquals(0, loadQueues(server0).size());
}
@@ -1864,7 +1860,7 @@
server1.start();
server0.start();
- locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@@ -1938,7 +1934,7 @@
{
}
}
-
+
assertEquals(0, loadQueues(server0).size());
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2012-02-23 14:57:50 UTC (rev 12185)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2012-02-23 14:58:13 UTC (rev 12186)
@@ -21,7 +21,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
@@ -81,15 +80,13 @@
{
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + id);
- serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),
- params));
+ serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
}
else
{
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, id);
- serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+ serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params));
}
HornetQServer service;
if(nodeManager == null)
@@ -129,15 +126,13 @@
{
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + id);
- serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),
- params));
+ serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
}
else
{
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, id);
- serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+ serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params));
}
HornetQServer service;
if(nodeManager == null)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java 2012-02-23 14:57:50 UTC (rev 12185)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java 2012-02-23 14:58:13 UTC (rev 12186)
@@ -13,15 +13,11 @@
package org.hornetq.tests.integration.management;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.BroadcastGroupControl;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
* A BroadcastGroupControlUsingCoreTest
@@ -33,24 +29,12 @@
public class BroadcastGroupControlUsingCoreTest extends BroadcastGroupControlTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private ClientSession session;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // BroadcastGroupControlTest overrides --------------------------------
-
@Override
protected BroadcastGroupControl createManagementControl(final String name) throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- ClientSessionFactory sf = locator.createSessionFactory();
- session = sf.createSession(false, true, true);
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
+ final ClientSession session = addClientSession(sf.createSession(false, true, true));
session.start();
return new BroadcastGroupControl()
@@ -109,28 +93,4 @@
}
};
}
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void tearDown() throws Exception
- {
- if (session != null)
- {
- session.close();
- }
-
- session = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
More information about the hornetq-commits
mailing list