Author: ataylor
Date: 2011-12-05 09:18:54 -0500 (Mon, 05 Dec 2011)
New Revision: 11830
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
test suite refactor
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -71,14 +71,6 @@
locator = createFactory(isNetty());
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- super.tearDown();
- }
-
private int getMessageEncodeSize(final SimpleString address) throws Exception
{
ServerLocator locator = createInVMNonHALocator();
@@ -97,62 +89,55 @@
public void testReceiveImmediateWithZeroWindow() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new
ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receiveImmediate();
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receiveImmediate();
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
- producer.send(sent);
+ ClientProducer producer = senderSession.createProducer("testWindow");
- senderSession.commit();
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receive(1000);
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receive(1000);
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
+
}
//
https://jira.jboss.org/jira/browse/HORNETQ-385
@@ -204,7 +189,6 @@
finally
{
locator.close();
- server.stop();
}
}
@@ -212,125 +196,109 @@
public void testReceiveImmediateWithZeroWindow3() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new
ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receive(10);
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receive(10);
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
+ ClientProducer producer = senderSession.createProducer("testWindow");
- producer.send(sent);
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
- senderSession.commit();
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receive(1000);
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receive(1000);
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
}
public void testReceiveImmediateWithZeroWindow4() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new
ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receive(10);
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receive(10);
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
+ ClientProducer producer = senderSession.createProducer("testWindow");
- producer.send(sent);
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
- senderSession.commit();
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receiveImmediate();
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receiveImmediate();
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
}
/*
@@ -342,55 +310,45 @@
{
HornetQServer messagingService = createServer(false, isNetty());
locator.setBlockOnNonDurableSend(false);
- try
- {
- messagingService.start();
- int numMessage = 100;
- locator.setConsumerWindowSize(numMessage * getMessageEncodeSize(addressA));
- ClientSessionFactory cf = locator.createSessionFactory();
- ClientSession sendSession = cf.createSession(false, true, true);
- ClientSession receiveSession = cf.createSession(false, true, true);
- sendSession.createQueue(addressA, queueA, false);
- ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
- ClientSession session = cf.createSession(false, true, true);
- ClientProducer cp = sendSession.createProducer(addressA);
- ClientConsumer cc = session.createConsumer(queueA);
- session.start();
- receiveSession.start();
- for (int i = 0; i < numMessage * 4; i++)
- {
- cp.send(sendSession.createMessage(false));
- }
+ messagingService.start();
+ int numMessage = 100;
+ locator.setConsumerWindowSize(numMessage * getMessageEncodeSize(addressA));
+ ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession receiveSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = receivingConsumer.receive(5000);
- Assert.assertNotNull(m);
- m.acknowledge();
- }
- receiveSession.close();
+ ClientSession session = cf.createSession(false, true, true);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ session.start();
+ receiveSession.start();
+ for (int i = 0; i < numMessage * 4; i++)
+ {
+ cp.send(sendSession.createMessage(false));
+ }
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = cc.receive(5000);
- Assert.assertNotNull(m);
- m.acknowledge();
- }
-
- session.close();
- sendSession.close();
-
- Assert.assertEquals(0, getMessageCount(messagingService, queueA.toString()));
-
+ for (int i = 0; i < numMessage * 2; i++)
+ {
+ ClientMessage m = receivingConsumer.receive(5000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
}
- finally
+ receiveSession.close();
+
+ for (int i = 0; i < numMessage * 2; i++)
{
- if (messagingService.isStarted())
- {
- messagingService.stop();
- }
+ ClientMessage m = cc.receive(5000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
}
+
+ session.close();
+ sendSession.close();
+
+ Assert.assertEquals(0, getMessageCount(messagingService, queueA.toString()));
}
public void testSlowConsumerBufferingOne() throws Exception
@@ -468,11 +426,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -600,11 +553,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -796,11 +744,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -875,11 +818,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1011,11 +949,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1023,7 +956,6 @@
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean
largeMessages) throws Exception
{
-
HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
@@ -1175,11 +1107,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1347,11 +1274,6 @@
{
ignored.printStackTrace();
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1485,11 +1407,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -257,11 +257,6 @@
session.close();
}
- if (server.isStarted())
- {
- server.stop();
- }
-
super.tearDown();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -79,13 +79,6 @@
locator = createFactory(isNetty());
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
- super.tearDown();
- }
-
protected boolean isNetty()
{
return false;
@@ -101,7 +94,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.getConfiguration()
.getInterceptorClassNames()
@@ -144,14 +137,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -170,7 +155,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -231,14 +216,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -257,7 +234,7 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, createDefaultConfig(isNetty()), 10000,
20000, new HashMap<String, AddressSettings>());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -340,14 +317,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -366,7 +335,7 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, createDefaultConfig(isNetty()), 10000,
20000, new HashMap<String, AddressSettings>());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -484,14 +453,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -65,7 +66,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -108,14 +109,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -132,7 +125,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -187,14 +180,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -211,7 +196,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -264,14 +249,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -290,7 +267,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -369,14 +346,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -107,107 +107,93 @@
final ClientSession session;
- try
- {
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
- AddressSettings settings = new AddressSettings();
- if (redeliveryDelay)
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
{
- settings.setRedeliveryDelay(1000);
- if (locator.isCompressLargeMessage())
- {
- locator.setConsumerWindowSize(0);
- }
+ locator.setConsumerWindowSize(0);
}
- settings.setMaxDeliveryAttempts(-1);
+ }
+ settings.setMaxDeliveryAttempts(-1);
- server.getAddressSettingsRepository().addMatch("#", settings);
+ server.getAddressSettingsRepository().addMatch("#", settings);
- server.start();
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- session = sf.createSession(false, false, false);
+ session = sf.createSession(false, false, false);
- session.createQueue(ADDRESS, ADDRESS, true);
+ session.createQueue(ADDRESS, ADDRESS, true);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- for (int i = 0 ; i < 20; i++)
- {
- Message clientFile = createLargeClientMessage(session, messageSize, true);
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
- clientFile.putIntProperty("value", i);
+ clientFile.putIntProperty("value", i);
- producer.send(clientFile);
- }
+ producer.send(clientFile);
+ }
- session.commit();
+ session.commit();
- session.start();
+ session.start();
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
- final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger errors = new AtomicInteger(0);
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- consumer.setMessageHandler(new MessageHandler()
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
{
- int counter = 0;
- public void onMessage(ClientMessage message)
+ message.getBodyBuffer().readByte();
+ // System.out.println("message:" + message);
+ try
{
- message.getBodyBuffer().readByte();
- // System.out.println("message:" + message);
- try
+ if (counter ++ < 20)
{
- if (counter ++ < 20)
- {
- Thread.sleep(100);
- // System.out.println("Rollback");
- message.acknowledge();
- session.rollback();
- }
- else
- {
- message.acknowledge();
- session.commit();
- }
-
- if (counter == 40)
- {
- latch.countDown();
- }
+ Thread.sleep(100);
+ // System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
}
- catch (Exception e)
+ else
{
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
latch.countDown();
- e.printStackTrace();
- errors.incrementAndGet();
}
}
- });
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
- assertTrue(latch.await(40, TimeUnit.SECONDS));
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
- consumer.close();
+ consumer.close();
- session.close();
+ session.close();
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testCloseConsumer() throws Exception
@@ -218,7 +204,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -268,14 +254,6 @@
catch (Throwable ignored)
{
}
-
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -305,7 +283,7 @@
config.setJournalBufferSize_AIO(10 * 1024);
config.setJournalBufferSize_NIO(10 * 1024);
- server = createServer(true, config);
+ HornetQServer server = createServer(true, config);
server.start();
@@ -362,14 +340,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -386,7 +356,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -489,14 +459,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -513,7 +475,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -572,14 +534,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -596,7 +550,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -708,14 +662,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -732,7 +678,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -843,14 +789,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -867,7 +805,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -950,14 +888,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -983,7 +913,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -1039,14 +969,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -1071,7 +993,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -1126,14 +1048,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -2002,73 +1916,58 @@
// there are two bindings.. one is ACKed, the other is not, the server is
restarted
// The other binding is acked... The file must be deleted
- try
- {
+ HornetQServer server = createServer(true, isNetty());
- server = createServer(true, isNetty());
+ server.start();
- server.start();
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"),
new SimpleString("queue2") };
- SimpleString queue[] = new SimpleString[] { new
SimpleString("queue1"), new SimpleString("queue2") };
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
+ int numberOfBytes = 400000;
- int numberOfBytes = 400000;
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ session.start();
- session.start();
+ producer.send(clientFile);
- producer.send(clientFile);
+ producer.close();
- producer.close();
+ ClientConsumer consumer = session.createConsumer(queue[1]);
+ ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
+ Assert.assertNull(consumer.receiveImmediate());
+ Assert.assertNotNull(msg);
- ClientConsumer consumer = session.createConsumer(queue[1]);
- ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- msg.getBodyBuffer().readByte();
- Assert.assertNull(consumer.receiveImmediate());
- Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer.close();
- msg.acknowledge();
- consumer.close();
+ log.debug("Stopping");
- log.debug("Stopping");
+ session.stop();
- session.stop();
+ ClientConsumer consumer1 = session.createConsumer(queue[0]);
- ClientConsumer consumer1 = session.createConsumer(queue[0]);
+ session.start();
- session.start();
+ msg = consumer1.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer1.close();
- msg = consumer1.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- Assert.assertNotNull(msg);
- msg.acknowledge();
- consumer1.close();
+ session.commit();
- session.commit();
+ session.close();
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testTwoBindingsAndRestart() throws Exception
@@ -2085,66 +1984,50 @@
{
// there are two bindings.. one is ACKed, the other is not, the server is
restarted
// The other binding is acked... The file must be deleted
+ HornetQServer server = createServer(true, isNetty());
- try
- {
+ server.start();
- server = createServer(true, isNetty());
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"),
new SimpleString("queue2") };
- server.start();
+ ClientSessionFactory sf = locator.createSessionFactory();
- SimpleString queue[] = new SimpleString[] { new
SimpleString("queue1"), new SimpleString("queue2") };
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSessionFactory sf = locator.createSessionFactory();
+ session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
- ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+ int numberOfBytes = 400000;
- session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
- int numberOfBytes = 400000;
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ producer.send(clientFile);
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
+ producer.close();
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- producer.send(clientFile);
+ readMessage(session, queue[1], numberOfBytes);
- producer.close();
+ if (restart)
+ {
+ session.close();
- readMessage(session, queue[1], numberOfBytes);
+ server.stop();
- if (restart)
- {
- session.close();
+ server = createServer(true, isNetty());
- server.stop();
+ server.start();
- server = createServer(true, isNetty());
+ sf = locator.createSessionFactory();
- server.start();
+ session = sf.createSession(null, null, false, true, true, false, 0);
+ }
- sf = locator.createSessionFactory();
+ readMessage(session, queue[0], numberOfBytes);
- session = sf.createSession(null, null, false, true, true, false, 0);
- }
+ session.close();
- readMessage(session, queue[0], numberOfBytes);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testSendRollbackXADurable() throws Exception
@@ -2171,75 +2054,53 @@
{
ClientSession session = null;
- try
- {
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
- server.start();
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- session = sf.createSession(isXA, false, false);
+ session = sf.createSession(isXA, false, false);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
- Xid xid = null;
+ Xid xid = null;
- if (isXA)
- {
- xid = RandomUtil.randomXid();
- session.start(xid, XAResource.TMNOFLAGS);
- }
+ if (isXA)
+ {
+ xid = RandomUtil.randomXid();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- Message clientFile = createLargeClientMessage(session, 50000, durable);
+ Message clientFile = createLargeClientMessage(session, 50000, durable);
- for (int i = 0; i < 1; i++)
- {
- producer.send(clientFile);
- }
+ for (int i = 0; i < 1; i++)
+ {
+ producer.send(clientFile);
+ }
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.close();
- server.stop();
- server.start();
- sf = locator.createSessionFactory();
- session = sf.createSession(isXA, false, false);
-
- session.rollback(xid);
- }
- else
- {
- session.rollback();
- }
-
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
session.close();
+ server.stop();
+ server.start();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(isXA, false, false);
- validateNoFilesOnLargeDir();
+ session.rollback(xid);
}
- finally
+ else
{
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
+ session.rollback();
}
+ session.close();
+
+ validateNoFilesOnLargeDir();
}
public void testSimpleRollback() throws Exception
@@ -2256,132 +2117,116 @@
{
// there are two bindings.. one is ACKed, the other is not, the server is
restarted
// The other binding is acked... The file must be deleted
+ HornetQServer server = createServer(true, isNetty());
- try
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ Xid xid = null;
+
+ if (isXA)
{
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
- server = createServer(true, isNetty());
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null,
true);
- server.start();
+ int numberOfBytes = 200000;
- ClientSessionFactory sf = locator.createSessionFactory();
+ session.start();
- ClientSession session = sf.createSession(isXA, false, false);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- Xid xid = null;
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ for (int n = 0; n < 10; n++)
+ {
+ Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 ==
0);
+
+ producer.send(clientFile);
+
+ Assert.assertNull(consumer.receiveImmediate());
+
if (isXA)
{
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
}
+ else
+ {
+ session.rollback();
+ }
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null,
true);
+ clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
- int numberOfBytes = 200000;
+ producer.send(clientFile);
- session.start();
+ Assert.assertNull(consumer.receiveImmediate());
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
- for (int n = 0; n < 10; n++)
+ for (int i = 0; i < 2; i++)
{
- Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2
== 0);
- producer.send(clientFile);
+ ClientMessage clientMessage = consumer.receive(5000);
- Assert.assertNull(consumer.receiveImmediate());
+ Assert.assertNotNull(clientMessage);
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
+ Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
- clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
+ clientMessage.acknowledge();
- producer.send(clientFile);
-
- Assert.assertNull(consumer.receiveImmediate());
-
if (isXA)
{
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
+ if (i == 0)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
}
else
{
- session.commit();
- }
-
- for (int i = 0; i < 2; i++)
- {
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- Assert.assertNotNull(clientMessage);
-
- Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
-
- clientMessage.acknowledge();
-
- if (isXA)
+ if (i == 0)
{
- if (i == 0)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
+ session.rollback();
}
else
{
- if (i == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
+ session.commit();
}
}
}
-
- session.close();
-
- validateNoFilesOnLargeDir();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
+ session.close();
+
+ validateNoFilesOnLargeDir();
}
public void testBufferMultipleLargeMessages() throws Exception
@@ -2604,144 +2449,128 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
- {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientMessage message = null;
- ClientMessage message = null;
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ message.getBodyBuffer().writerIndex(0);
- message.getBodyBuffer().writerIndex(0);
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
- ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
- clientFile.putBooleanProperty("TestLarge", true);
- producer.send(clientFile);
+ producer.send(message);
+ }
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- producer.send(message);
- }
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- session.close();
+ producer.send(message);
+ }
- server.stop();
+ session.close();
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
+ server.stop();
- sf = locator.createSessionFactory();
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
- for (int ad = 0; ad < 2; ad++)
- {
- session = sf.createSession(false, false, false);
+ sf = locator.createSessionFactory();
- ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
- session.start();
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ session.start();
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
- }
+ message2.acknowledge();
- session.commit();
+ Assert.assertNotNull(message2);
+ }
- for (int i = 0; i < 5; i++)
- {
- ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+ session.commit();
- assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
- Assert.assertNotNull(messageLarge);
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- messageLarge.acknowledge();
- messageLarge.saveToOutputStream(bout);
- byte[] body = bout.toByteArray();
- assertEquals(numberOfBytesBigMessage, body.length);
- for (int bi = 0; bi < body.length; bi++)
- {
- assertEquals(getSamplebyte(bi), body[bi]);
- }
+ Assert.assertNotNull(messageLarge);
- if (i < 4)
- session.rollback();
- else
- session.commit();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ messageLarge.acknowledge();
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
}
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ if (i < 4)
+ session.rollback();
+ else
+ session.commit();
+ }
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
- }
+ message2.acknowledge();
- session.commit();
+ Assert.assertNotNull(message2);
+ }
- consumer.close();
+ session.commit();
- session.close();
+ consumer.close();
- }
+ session.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
@@ -2758,128 +2587,111 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
- {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setCompressLargeMessage(true);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- locator.setCompressLargeMessage(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, true, true);
- ClientSession session = sf.createSession(false, true, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ int msgId = 0;
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- int msgId = 0;
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message = session.createMessage(true);
+ message.putIntProperty("msgID", msgId++);
- message.putIntProperty("msgID", msgId++);
+ message.putBooleanProperty("TestLarge", false);
- message.putBooleanProperty("TestLarge", false);
+ message.getBodyBuffer().writerIndex(0);
- message.getBodyBuffer().writerIndex(0);
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
+ producer.send(message);
+ }
- for (int i = 0; i < 10; i++)
- {
- ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
- clientFile.putBooleanProperty("TestLarge", true);
- producer.send(clientFile);
- }
- session.close();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+ }
- for (int ad = 0; ad < 2; ad++)
- {
- session = sf.createSession(false, false, false);
+ session.close();
- ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
- session.start();
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
- for (int received = 0 ; received < 5; received++)
+ session.start();
+
+ for (int received = 0 ; received < 5; received++)
+ {
+ for (int i = 0; i < 100; i++)
{
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- Assert.assertNotNull(message2);
+ Assert.assertNotNull(message2);
- assertFalse(message2.getBooleanProperty("TestLarge"));
+ assertFalse(message2.getBooleanProperty("TestLarge"));
- message2.acknowledge();
+ message2.acknowledge();
- Assert.assertNotNull(message2);
- }
+ Assert.assertNotNull(message2);
+ }
- for (int i = 0; i < 10; i++)
- {
- ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
- Assert.assertNotNull(messageLarge);
+ Assert.assertNotNull(messageLarge);
- assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
- messageLarge.acknowledge();
+ messageLarge.acknowledge();
- messageLarge.saveToOutputStream(bout);
- byte[] body = bout.toByteArray();
- assertEquals(numberOfBytesBigMessage, body.length);
- for (int bi = 0; bi < body.length; bi++)
- {
- assertEquals(getSamplebyte(bi), body[bi]);
- }
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
}
-
- session.rollback();
}
- session.commit();
+ session.rollback();
+ }
- consumer.close();
+ session.commit();
- session.close();
+ consumer.close();
- }
+ session.close();
}
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSendStreamingSingleMessage() throws Exception
@@ -3135,110 +2947,96 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ if (sendBlocking)
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf.getServerLocator().setBlockOnNonDurableSend(true);
+ sf.getServerLocator().setBlockOnDurableSend(true);
+ sf.getServerLocator().setBlockOnAcknowledge(true);
+ }
- if (sendBlocking)
- {
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
- }
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null,
true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null,
true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientMessage message = null;
- ClientMessage message = null;
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ // TODO: Why do I need to reset the writerIndex?
+ message.getBodyBuffer().writerIndex(0);
- // TODO: Why do I need to reset the writerIndex?
- message.getBodyBuffer().writerIndex(0);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
- ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ producer.send(message);
+ }
- producer.send(clientFile);
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
- session.close();
+ producer.send(clientFile);
- if (realFiles)
- {
- server.stop();
+ session.close();
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
+ if (realFiles)
+ {
+ server.stop();
- sf = locator.createSessionFactory();
- }
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
- session = sf.createSession(null, null, false, true, true, false, 0);
+ sf = locator.createSessionFactory();
+ }
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ session = sf.createSession(null, null, false, true, true, false, 0);
- session.start();
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ session.start();
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
+ message2.acknowledge();
- message.getBodyBuffer().readerIndex(0);
+ Assert.assertNotNull(message2);
- for (int j = 1; j <= numberOfBytes; j++)
- {
- Assert.assertEquals(j, message.getBodyBuffer().readInt());
- }
+ message.getBodyBuffer().readerIndex(0);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ Assert.assertEquals(j, message.getBodyBuffer().readInt());
}
+ }
- consumer.close();
+ consumer.close();
- session.close();
+ session.close();
- session = sf.createSession(null, null, false, true, true, false, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
- readMessage(session, LargeMessageTest.ADDRESS, numberOfBytesBigMessage);
+ readMessage(session, LargeMessageTest.ADDRESS, numberOfBytesBigMessage);
- // printBuffer("message received : ", message2.getBody());
+ // printBuffer("message received : ", message2.getBody());
- session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ session.close();
}
// Private -------------------------------------------------------
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-12-05
14:14:43 UTC (rev 11829)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-12-05
14:18:54 UTC (rev 11830)
@@ -56,8 +56,6 @@
protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
- protected HornetQServer server;
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -70,26 +68,6 @@
// Protected -----------------------------------------------------
- @Override
- protected void tearDown() throws Exception
- {
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e)
- {
- LargeMessageTestBase.log.warn(e.getMessage(), e);
- }
- }
-
- server = null;
-
- super.tearDown();
- }
-
protected void testChunks(final boolean isXA,
final boolean restartOnXA,
final boolean rollbackFirstSend,
@@ -139,7 +117,7 @@
{
clearData();
- server = createServer(realFiles);
+ HornetQServer server = createServer(realFiles);
server.start();
ServerLocator locator = createInVMNonHALocator();