Author: borges
Date: 2011-12-05 11:35:33 -0500 (Mon, 05 Dec 2011)
New Revision: 11838
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionStopStartTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionDurabilityTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java
Log:
Close resources at super.tearDown()
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -30,7 +30,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -38,7 +37,7 @@
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A ClientSessionFactoryTest
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@@ -48,10 +47,9 @@
*/
public class SessionFactoryTest extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
+ private final DiscoveryGroupConfiguration groupConfiguration =
+ new DiscoveryGroupConfiguration(getUDPDiscoveryAddress(),
getUDPDiscoveryPort());
- private DiscoveryGroupConfiguration groupConfiguration = new
DiscoveryGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort());
-
private HornetQServer liveService;
private TransportConfiguration liveTC;
@@ -64,20 +62,6 @@
startServer();
}
- @Override
- protected void tearDown() throws Exception
- {
- if (liveService != null)
- {
- liveService.stop();
- }
-
- liveService = null;
- liveTC = null;
-
- super.tearDown();
- }
-
public void testSerializable() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
@@ -101,7 +85,7 @@
Assert.assertNotNull(csi);
csi.close();
-
+
locator.close();
}
@@ -141,15 +125,15 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
-
+
locator.close();
}
@@ -182,13 +166,13 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -20,8 +20,12 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.logging.Logger;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -29,15 +33,13 @@
* A SendAcknowledgementsTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 9 Feb 2009 13:29:19
*
*
*/
public class SessionSendAcknowledgementHandlerTest extends ServiceTestBase
{
- private static final Logger log =
Logger.getLogger(SessionSendAcknowledgementHandlerTest.class);
-
private HornetQServer server;
private final SimpleString address = new SimpleString("address");
@@ -53,26 +55,13 @@
server.start();
}
- @Override
- protected void tearDown() throws Exception
- {
- if (server != null && server.isStarted())
- {
- server.stop();
- }
-
- server = null;
-
- super.tearDown();
- }
-
public void testSetInvalidSendACK() throws Exception
{
ServerLocator locator = createInVMNonHALocator();
locator.setConfirmationWindowSize(-1);
- ClientSessionFactory csf = locator.createSessionFactory();
+ ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession(null, null, false, true, true, false,
1);
try
@@ -119,7 +108,7 @@
locator.setConfirmationWindowSize(windowSize);
- ClientSessionFactory csf = locator.createSessionFactory();
+ ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession(null, null, false, true, true, false,
1);
session.createQueue(address, queueName, false);
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionStopStartTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionStopStartTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionStopStartTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -18,7 +18,13 @@
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -42,27 +48,13 @@
super.setUp();
server = createServer(false);
-
server.start();
-
locator = createInVMNonHALocator();
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- server.stop();
-
- server = null;
-
- super.tearDown();
- }
-
public void testStopStartConsumerSyncReceiveImmediate() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -106,7 +98,7 @@
public void testStopStartConsumerSyncReceive() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -153,7 +145,7 @@
public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -243,7 +235,7 @@
public void testStopStartConsumerAsyncSync() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -341,7 +333,7 @@
public void testStopStartConsumerAsyncASyncStoppeeByHandler() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -443,7 +435,7 @@
public void testStopStartConsumerAsyncASync() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -560,7 +552,7 @@
public void testStopStartMultipleConsumers() throws Exception
{
locator.setConsumerWindowSize(getMessageEncodeSize(QUEUE) * 33);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -613,7 +605,7 @@
public void testStopStartAlreadyStartedSession() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
@@ -654,7 +646,7 @@
public void testStopAlreadyStoppedSession() throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, true, true);
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -58,19 +58,10 @@
server.start();
}
- @Override
- protected void tearDown() throws Exception
- {
- stopComponent(server);
- closeSessionFactory(cf);
- closeServerLocator(locator);
- super.tearDown();
- }
-
public void testFailureListener() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
final CountDownLatch latch = new CountDownLatch(1);
clientSession.addFailureListener(new SessionFailureListener()
@@ -96,7 +87,7 @@
public void testFailureListenerRemoved() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
class MyFailureListener implements SessionFailureListener
{
@@ -130,7 +121,7 @@
long ttl = 500;
server.getConfiguration().setConnectionTTLOverride(ttl);
server.start();
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSessionInternal clientSession =
(ClientSessionInternal)cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer producer = clientSession.createProducer();
@@ -168,7 +159,7 @@
public void testBindingQuery() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", "q1", false);
clientSession.createQueue("a1", "q2", false);
@@ -194,7 +185,7 @@
public void testQueueQuery() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", queueName, false);
clientSession.createConsumer(queueName);
@@ -212,7 +203,7 @@
public void testQueueQueryWithFilter() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", queueName, "foo=bar",
false);
clientSession.createConsumer(queueName);
@@ -227,7 +218,7 @@
public void testQueueQueryNoQ() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
Assert.assertFalse(resp.isExists());
@@ -237,7 +228,7 @@
public void testClose() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer p = clientSession.createProducer();
@@ -254,7 +245,7 @@
public void testCreateMessageNonDurable() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage(false);
Assert.assertFalse(clientMessage.isDurable());
@@ -263,7 +254,7 @@
public void testCreateMessageDurable() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage(true);
Assert.assertTrue(clientMessage.isDurable());
@@ -272,7 +263,7 @@
public void testCreateMessageType() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage((byte)99, false);
Assert.assertEquals((byte)99, clientMessage.getType());
@@ -281,7 +272,7 @@
public void testCreateMessageOverrides() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage((byte)88, false, 100l,
300l, (byte)33);
Assert.assertEquals((byte)88, clientMessage.getType());
@@ -293,7 +284,7 @@
public void testGetVersion() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
Assert.assertEquals(server.getVersion().getIncrementingVersion(),
clientSession.getVersion());
clientSession.close();
@@ -301,7 +292,7 @@
public void testStart() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
clientSession.start();
@@ -310,7 +301,7 @@
public void testStop() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
clientSession.start();
@@ -320,7 +311,7 @@
public void testCommitWithSend() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, false, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer cp = clientSession.createProducer(queueName);
@@ -343,7 +334,7 @@
public void testRollbackWithSend() throws Exception
{
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, false, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer cp = clientSession.createProducer(queueName);
@@ -371,7 +362,7 @@
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
ClientProducer cp = sendSession.createProducer(queueName);
ClientSession clientSession = cf.createSession(false, true, false);
@@ -430,7 +421,7 @@
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
- cf = locator.createSessionFactory();
+ cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
ClientProducer cp = sendSession.createProducer(queueName);
ClientSession clientSession = cf.createSession(false, true, false);
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -14,14 +14,18 @@
import junit.framework.Assert;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.logging.Logger;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A SimpleSendMultipleQueues
*
* @author Tim Fox
@@ -30,8 +34,6 @@
*/
public class SimpleSendMultipleQueues extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(SimpleSendMultipleQueues.class);
-
public static final String address = "testaddress";
public static final String queueName = "testqueue";
@@ -89,7 +91,7 @@
locator = createNettyNonHALocator();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
session = cf.createSession();
@@ -124,15 +126,8 @@
session.deleteQueue("queue3");
session.close();
-
- locator.close();
}
- if (server.isStarted())
- {
- server.stop();
- }
-
super.tearDown();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -32,7 +32,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientProducerCreditsImpl;
import org.hornetq.core.client.impl.ClientProducerImpl;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
@@ -105,8 +104,8 @@
session.close();
}
-
+
public void testMemoryLeakOnAddressSettingForTemporaryQueue() throws Exception
{
for (int i = 0 ; i < 1000; i++)
@@ -114,21 +113,21 @@
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
session.createTemporaryQueue(address, queue);
-
+
session.close();
session = sf.createSession();
}
-
-
+
+
session.close();
-
+
sf.close();
-
+
System.out.println("size = " +
server.getAddressSettingsRepository().getCacheSize());
-
+
assertTrue(server.getAddressSettingsRepository().getCacheSize() < 10);
}
-
+
public void testPaginStoreIsRemovedWhenQueueIsDeleted() throws Exception
{
SimpleString queue = RandomUtil.randomSimpleString();
@@ -148,7 +147,7 @@
message.acknowledge();
SimpleString[] storeNames =
server.getPostOffice().getPagingManager().getStoreNames();
- assertTrue(Arrays.asList(storeNames).contains(address));
+ assertTrue(Arrays.asList(storeNames).contains(address));
consumer.close();
session.deleteQueue(queue);
@@ -158,7 +157,7 @@
session.close();
}
-
+
public void testConsumeFromTemporaryQueueCreatedByOtherSession() throws Exception
{
SimpleString queue = RandomUtil.randomSimpleString();
@@ -204,7 +203,7 @@
// wait for the closing listeners to be fired
Assert.assertTrue("connection close listeners not fired", latch.await(2 *
TemporaryQueueTest.CONNECTION_TTL,
TimeUnit.MILLISECONDS));
-
+
sf = locator.createSessionFactory();
session = sf.createSession(false, true, true);
session.start();
@@ -228,37 +227,37 @@
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
session.createTemporaryQueue("a.#", "queue3");
-
+
ClientProducer producer = session.createProducer("a.b");
producer.send(session.createMessage(false));
-
+
ClientConsumer cons = session.createConsumer("queue2");
session.start();
-
+
ClientMessage msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
cons.close();
cons = session.createConsumer("queue3");
session.start();
-
+
msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
cons.close();
session.deleteQueue("queue2");
session.deleteQueue("queue3");
-
+
session.close();
}
@@ -268,37 +267,37 @@
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
session.createTemporaryQueue("a.#", "queue3");
-
+
ClientProducer producer = session.createProducer("a.b");
producer.send(session.createMessage(false));
-
+
ClientConsumer cons = session.createConsumer("queue2");
session.start();
-
+
ClientMessage msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
cons.close();
cons = session.createConsumer("queue3");
session.start();
-
+
msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
cons.close();
session.deleteQueue("queue2");
session.deleteQueue("queue3");
-
+
session.close();
}
@@ -312,7 +311,7 @@
}
/**
- * @see
org.hornetq.core.server.impl.ServerSessionImpl#doHandleCreateQueue(org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage)
+ * @see
org.hornetq.core.server.impl.ServerSessionImpl#doHandleCreateQueue(org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage)
*/
public void testDeleteTemporaryQueueAfterConnectionIsClosed_2() throws Exception
{
@@ -333,11 +332,11 @@
session2.start();
- ClientConsumer consumer = session2.createConsumer(queue);
+ session2.createConsumer(queue);
session2.close();
}
-
+
public void testRecreateConsumerOverServerFailure() throws Exception
{
ServerLocator serverWithReattach = createLocator();
@@ -345,60 +344,60 @@
serverWithReattach.setRetryInterval(1000);
serverWithReattach.setConfirmationWindowSize(-1);
ClientSessionFactory reattachSF = serverWithReattach.createSessionFactory();
-
+
ClientSession session = reattachSF.createSession(false, false);
session.createTemporaryQueue("tmpAd", "tmpQ");
ClientConsumer consumer = session.createConsumer("tmpQ");
-
+
ClientProducer prod = session.createProducer("tmpAd");
-
+
session.start();
-
+
RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
conn.fail(new HornetQException(HornetQException.IO_ERROR));
-
+
prod.send(session.createMessage(false));
session.commit();
-
+
assertNotNull(consumer.receive(1000));
-
+
session.close();
-
+
reattachSF.close();
-
+
serverWithReattach.close();
-
-
+
+
}
-
+
public void testTemoraryQueuesWithFilter() throws Exception
{
-
+
int countTmpQueue=0;
-
+
final AtomicInteger errors = new AtomicInteger(0);
-
+
class MyHandler implements MessageHandler
{
final String color;
-
+
final CountDownLatch latch;
-
+
final ClientSession sess;
-
+
public MyHandler(ClientSession sess, String color, int expectedMessages)
{
this.sess = sess;
latch = new CountDownLatch(expectedMessages);
this.color = color;
}
-
+
public boolean waitCompletion() throws Exception
{
return latch.await(10, TimeUnit.SECONDS);
}
-
+
public void onMessage(ClientMessage message)
{
try
@@ -406,7 +405,7 @@
message.acknowledge();
sess.commit();
latch.countDown();
-
+
if (!message.getStringProperty("color").equals(color))
{
log.warn("Unexpected color " +
message.getStringProperty("color") + " when we were expecting " +
color);
@@ -419,28 +418,28 @@
errors.incrementAndGet();
}
}
-
+
}
-
+
String address = "AD_test";
int iterations = 100;
int msgs = 100;
-
+
// Will be using a single Session as this is how an issue was raised
for (int i = 0 ; i < iterations; i++)
{
ClientSessionFactory clientsConnecton = locator.createSessionFactory();
ClientSession localSession = clientsConnecton.createSession();
-
+
ClientProducer prod = localSession.createProducer(address);
-
+
localSession.start();
-
+
log.info("Iteration " + i);
String queueRed = address + "_red_" + (countTmpQueue++);
String queueBlue = address + "_blue_" + (countTmpQueue++);
-
+
//ClientSession sessConsumerRed = clientsConnecton.createSession();
ClientSession sessConsumerRed = localSession;
sessConsumerRed.createTemporaryQueue(address, queueRed,
"color='red'");
@@ -448,7 +447,7 @@
ClientConsumer redClientConsumer = sessConsumerRed.createConsumer(queueRed);
redClientConsumer.setMessageHandler(redHandler);
//sessConsumerRed.start();
-
+
//ClientSession sessConsumerBlue = clientsConnecton.createSession();
ClientSession sessConsumerBlue = localSession;
sessConsumerBlue.createTemporaryQueue(address, queueBlue,
"color='blue'");
@@ -456,7 +455,7 @@
ClientConsumer blueClientConsumer = sessConsumerBlue.createConsumer(queueBlue);
blueClientConsumer.setMessageHandler(blueHandler);
//sessConsumerBlue.start();
-
+
try
{
ClientMessage msgBlue = session.createMessage(false);
@@ -468,17 +467,17 @@
for (int nmsg = 0; nmsg < msgs; nmsg++)
{
prod.send(msgBlue);
-
+
prod.send(msgRed);
-
+
session.commit();
}
-
+
blueHandler.waitCompletion();
redHandler.waitCompletion();
-
+
assertEquals(0, errors.get());
-
+
}
finally
{
@@ -488,14 +487,14 @@
clientsConnecton.close();
}
}
-
+
}
public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
{
session.close();
sf.close();
-
+
final SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
@@ -568,33 +567,34 @@
locator2.close();
}
-
+
public void testBlockingWithTemporaryQueue() throws Exception
{
-
+
AddressSettings setting = new AddressSettings();
setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
setting.setMaxSizeBytes(1024 * 1024);
-
+
server.getAddressSettingsRepository().addMatch("TestAD", setting);
-
+
ClientSessionFactory consumerCF = locator.createSessionFactory();
ClientSession consumerSession = consumerCF.createSession(true, true);
consumerSession.addMetaData("consumer", "consumer");
consumerSession.createTemporaryQueue("TestAD", "Q1");
- ClientConsumer consumer = consumerSession.createConsumer("Q1");
+ consumerSession.createConsumer("Q1");
consumerSession.start();
-
+
final ClientProducerImpl prod =
(ClientProducerImpl)session.createProducer("TestAD");
-
+
final AtomicInteger errors = new AtomicInteger(0);
-
+
final AtomicInteger msgs = new AtomicInteger(0);
-
+
final int TOTAL_MSG = 1000;
-
+
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -612,23 +612,23 @@
e.printStackTrace();
errors.incrementAndGet();
}
-
+
System.out.println("done");
}
};
-
+
t.start();
while (msgs.get() == 0)
{
Thread.sleep(100);
}
-
+
while (t.isAlive() && errors.get() == 0 &&
!prod.getProducerCredits().isBlocked())
{
Thread.sleep(100);
}
-
+
assertEquals(0, errors.get());
ClientSessionFactory newConsumerCF = locator.createSessionFactory();
@@ -636,7 +636,7 @@
newConsumerSession.createTemporaryQueue("TestAD", "Q2");
ClientConsumer newConsumer = newConsumerSession.createConsumer("Q2");
newConsumerSession.start();
-
+
int toReceive = TOTAL_MSG - msgs.get() - 1;
for (ServerSession sessionIterator: server.getSessions())
@@ -648,24 +648,24 @@
impl.getRemotingConnection().fail(new
HornetQException(HornetQException.DISCONNECTED, "failure e"));
}
}
-
+
int secondReceive = 0;
-
+
ClientMessage msg = null;
while (secondReceive < toReceive && (msg = newConsumer.receive(5000)) !=
null)
{
msg.acknowledge();
secondReceive++;
}
-
+
assertNull(newConsumer.receiveImmediate());
-
+
assertEquals(toReceive, secondReceive);
-
+
t.join();
-
-
-
+
+
+
}
// Package protected ---------------------------------------------
@@ -683,41 +683,19 @@
server.start();
locator = createLocator();
- sf = locator.createSessionFactory();
- session = sf.createSession(false, true, true);
+ sf = createSessionFactory(locator);
+ session = addClientSession(sf.createSession(false, true, true));
}
protected ServerLocator createLocator()
{
ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ addServerLocator(retlocator);
retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
retlocator.setClientFailureCheckPeriod(TemporaryQueueTest.CONNECTION_TTL / 3);
return retlocator;
}
- @Override
- protected void tearDown() throws Exception
- {
-
- sf.close();
-
- session.close();
-
- locator.close();
-
- locator = null;
-
- server.stop();
-
- session = null;
-
- server = null;
-
- sf = null;
-
- super.tearDown();
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionDurabilityTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionDurabilityTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionDurabilityTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -16,29 +16,33 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A TransactionDurabilityTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 16 Jan 2009 11:00:33
*
*
*/
public class TransactionDurabilityTest extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(TransactionDurabilityTest.class);
/*
* This tests the following situation:
- *
+ *
* (With the old implementation)
* Currently when a new persistent message is routed to persistent queues, the message
is first stored, then the message is routed.
* Let's say it has been routed to two different queues A, B.
@@ -46,10 +50,10 @@
* transactionally, but it's not committed yet.
* Ref R2 then gets consumed and acknowledged by non transacted session S2, this
causes a delete record to be written to storage.
* R1 then rolls back, and the server is restarted - unfortunatelt since the delete
record was written R1 is not ready to be consumed again.
- *
+ *
* It's therefore crucial the messages aren't deleted from storage until AFTER
any ack records are committed to storage.
- *
- *
+ *
+ *
*/
public void testRolledBackAcknowledgeWithSameMessageAckedByOtherSession() throws
Exception
{
@@ -65,9 +69,11 @@
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator =
+ addServerLocator(HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(
+
ServiceTestBase.INVM_CONNECTOR_FACTORY)));
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session1 = sf.createSession(false, true, true);
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java 2011-12-05
16:07:59 UTC (rev 11837)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java 2011-12-05
16:35:33 UTC (rev 11838)
@@ -42,25 +42,14 @@
protected void setUp() throws Exception
{
super.setUp();
-
locator = createInVMNonHALocator();
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- super.tearDown();
- }
-
public void testSendWithCommit() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, false, false);
session.createQueue(addressA, queueA, false);
ClientProducer cp = session.createProducer(addressA);
@@ -82,23 +71,13 @@
session.commit();
Assert.assertEquals(q.getMessageCount(), numMessages * 2);
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testSendWithRollback() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, false, false);
session.createQueue(addressA, queueA, false);
ClientProducer cp = session.createProducer(addressA);
@@ -120,14 +99,6 @@
session.commit();
Assert.assertEquals(q.getMessageCount(), numMessages);
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
}