[hornetq-commits] JBoss hornetq SVN: r10712 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat May 21 12:29:15 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-21 12:29:14 -0400 (Sat, 21 May 2011)
New Revision: 10712
Modified:
trunk/hornetq.ipr
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
fixing build with changes from Branch_2_2 on tests
Modified: trunk/hornetq.ipr
===================================================================
--- trunk/hornetq.ipr 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/hornetq.ipr 2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,5 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
+ <component name="ASMPluginConfiguration">
+ <asm skipDebug="false" skipFrames="false" skipCode="false" expandFrames="false" />
+ <groovy codeStyle="LEGACY" />
+ </component>
<component name="AntConfiguration">
<defaultAnt bundledAnt="true" />
<buildFile url="file://$PROJECT_DIR$/build-hornetq.xml">
@@ -85,7 +89,7 @@
<element module="All" copyright="new" />
</module2copyright>
</component>
- <component name="CppTools.Loader" reportImplicitCastToBool="false" warnedAboutFileOutOfSourceRoot="true" version="1" currentProject="$PROJECT_DIR$/native/Makefile" />
+ <component name="CppTools.Loader" reportImplicitCastToBool="false" reportNameReferencedOnce="false" warnedAboutFileOutOfSourceRoot="true" version="3" currentProject="$PROJECT_DIR$/native/Makefile" compilerSelect="AUTO" />
<component name="DependenciesAnalyzeManager">
<option name="myForwardDirection" value="false" />
</component>
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -13,10 +13,13 @@
package org.hornetq.tests.integration.client;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -42,9 +45,16 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -262,8 +272,7 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
ClientMessage msg = consumer.receive(5000);
@@ -284,7 +293,7 @@
}
assertNull(msg);
- for (int i = xids.size() -1 ; i >= 0; i--)
+ for (int i = xids.size() - 1; i >= 0; i--)
{
Xid xid = xids.get(i);
session.rollback(xid);
@@ -298,25 +307,25 @@
session = sf.createSession(false, false, false);
session.start();
-
+
consumer = session.createConsumer(PagingTest.ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
-
+
assertEquals(i, msg.getIntProperty("id").intValue());
-
+
if (i % 500 == 0)
{
session.commit();
}
}
-
+
session.commit();
-
+
session.close();
sf.close();
@@ -324,14 +333,13 @@
locator.close();
assertEquals(0, queue.getMessageCount());
-
+
long timeout = System.currentTimeMillis() + 5000;
while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
{
Thread.sleep(100);
}
- assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
- // assertEquals(numberOfMessages, queue.getMessageCount());
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
}
finally
{
@@ -346,6 +354,350 @@
}
+ public void testMissingTXEverythingAcked() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 5000;
+
+ final int numberOfTX = 10;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS.toString(), "q1", true);
+
+ session.createQueue(ADDRESS.toString(), "q2", true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % messagesPerTX == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ List<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+ JournalImpl jrn = new JournalImpl(config.getJournalFileSize(),
+ 2,
+ 0,
+ 0,
+ new NIOSequentialFileFactory(getJournalDir()),
+ "hornetq-data",
+ "hq",
+ 1);
+ jrn.start();
+ jrn.load(records, list, null);
+
+ // Delete everything from the journal
+ for (RecordInfo info : records)
+ {
+ if (!info.isUpdate)
+ {
+ jrn.appendDeleteRecord(info.id, false);
+ }
+ }
+
+ jrn.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ Page pg = server.getPagingManager().getPageStore(ADDRESS).getCurrentPage();
+
+ pg.open();
+
+ List<PagedMessage> msgs = pg.read(server.getStorageManager());
+
+ pg.close();
+
+ long queues[] = new long[] { server.locateQueue(new SimpleString("q1")).getID() };
+
+ for (long q : queues)
+ {
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ server.getStorageManager().storeCursorAcknowledge(q, new PagePositionImpl(pg.getPageId(), i));
+ }
+ }
+
+ server.stop();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ClientSessionFactory csf = locator.createSessionFactory();
+
+ ClientSession sess = csf.createSession();
+
+ sess.start();
+
+ ClientConsumer cons = sess.createConsumer("q1");
+
+ assertNull(cons.receive(500));
+
+ Thread.sleep(5000);
+
+ ClientConsumer cons2 = sess.createConsumer("q2");
+ assertNull(cons2.receive(500));
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sess.close();
+
+ locator.close();
+
+ server.stop();
+ }
+
+ public void testMissingTXEverythingAcked2() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 6;
+
+ final int numberOfTX = 2;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS.toString(), "q1", true);
+
+ session.createQueue(ADDRESS.toString(), "q2", true);
+
+ server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putStringProperty("id", "str-" + i);
+
+ producer.send(message);
+ if ((i + 1) % messagesPerTX == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ session.start();
+
+ for (int i = 1; i <= 2; i++)
+ {
+ ClientConsumer cons = session.createConsumer("q" + i);
+
+ for (int j = 0; j < 3; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str-" + j, msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ }
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory csf = locator.createSessionFactory();
+
+ ClientSession session = csf.createSession();
+
+ session.start();
+
+ for (int i = 1; i <= 2; i++)
+ {
+ ClientConsumer cons = session.createConsumer("q" + i);
+
+ for (int j = 3; j < 6; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str-" + j, msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+ assertNull(cons.receive(500));
+
+ }
+
+ session.close();
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ locator.close();
+
+ server.stop();
+ }
+
public void testTwoQueuesOneNoRouting() throws Exception
{
boolean persistentMessages = true;
@@ -2454,6 +2806,8 @@
catch (Throwable ignored)
{
}
+
+ OperationContextImpl.clearContext();
}
}
@@ -2787,8 +3141,7 @@
}
}
}
-
-
+
public void testPageAndDepageRapidly() throws Exception
{
boolean persistentMessages = true;
@@ -2800,11 +3153,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true,
- config,
- 512 * 1024,
- 1024 * 1024,
- new HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
server.start();
@@ -2827,9 +3176,9 @@
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
+
final AtomicInteger errors = new AtomicInteger(0);
-
+
Thread consumeThread = new Thread()
{
public void run()
@@ -2839,16 +3188,16 @@
{
sessionConsumer = sf.createSession(false, false);
sessionConsumer.start();
-
+
ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
System.out.println("Message " + i + " consumed");
assertNotNull(msg);
msg.acknowledge();
-
+
if (i % 20 == 0)
{
System.out.println("Commit consumer");
@@ -2874,10 +3223,10 @@
errors.incrementAndGet();
}
}
-
+
}
};
-
+
consumeThread.start();
ClientMessage message = null;
@@ -2887,7 +3236,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(persistentMessages);
-
+
System.out.println("Message " + i + " sent");
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -2897,23 +3246,24 @@
message.putIntProperty(new SimpleString("id"), i);
producer.send(message);
-
+
Thread.sleep(50);
}
-
consumeThread.join();
-
+
long timeout = System.currentTimeMillis() + 5000;
-
- while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+
+ while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getNumberOfPages() != 1))
{
Thread.sleep(1);
}
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
+
assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
sf.close();
@@ -2933,7 +3283,6 @@
}
-
public void testTwoQueuesDifferentFilters() throws Exception
{
boolean persistentMessages = true;
@@ -2959,7 +3308,7 @@
try
{
ServerLocator locator = createInVMNonHALocator();
-
+
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -2971,14 +3320,16 @@
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
-
+
// note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES
int NQUEUES = 2;
-
- for (int i = 0 ; i < NQUEUES; i++)
+ for (int i = 0; i < NQUEUES; i++)
{
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true);
+ session.createQueue(PagingTest.ADDRESS,
+ PagingTest.ADDRESS.concat("=" + i),
+ new SimpleString("propTest=" + i),
+ true);
}
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -3012,20 +3363,20 @@
for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
{
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
-
- for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+
+ for (int i = 0; i < (numberOfMessages / NQUEUES); i++)
{
message = consumer.receive(500000);
assertNotNull(message);
message.acknowledge();
-
+
assertEquals(nqueue, message.getIntProperty("propTest").intValue());
}
-
+
assertNull(consumer.receiveImmediate());
-
+
consumer.close();
-
+
session.commit();
}
@@ -3038,7 +3389,6 @@
Thread.sleep(100);
}
-
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
@@ -3056,11 +3406,8 @@
{
}
}
-
}
-
-
public void testTwoQueues() throws Exception
{
boolean persistentMessages = true;
@@ -3086,7 +3433,7 @@
try
{
ServerLocator locator = createInVMNonHALocator();
-
+
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -3098,7 +3445,6 @@
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
-
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
@@ -3133,22 +3479,22 @@
for (int msg = 1; msg <= 2; msg++)
{
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
-
+
for (int i = 0; i < numberOfMessages; i++)
{
message = consumer.receive(500000);
assertNotNull(message);
message.acknowledge();
-
- //assertEquals(msg, message.getIntProperty("propTest").intValue());
-
+
+ // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
}
-
+
session.commit();
-
+
assertNull(consumer.receiveImmediate());
-
+
consumer.close();
}
@@ -3162,10 +3508,9 @@
}
store.getCursorProvier().cleanup();
-
+
Thread.sleep(1000);
-
-
+
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
@@ -3183,11 +3528,374 @@
{
}
}
+ }
+ public void testDLAOnLargeMessageAndPaging() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+ AddressSettings dla = new AddressSettings();
+ dla.setMaxDeliveryAttempts(5);
+ dla.setDeadLetterAddress(new SimpleString("DLA"));
+ settings.put(ADDRESS.toString(), dla);
+
+ final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.createQueue("DLA", "DLA");
+
+ PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+ pgStoreAddress.startPaging();
+ PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ log.info("send message #" + i);
+ message = session.createMessage(true);
+
+ message.putStringProperty("id", "str" + i);
+
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ producer.send(message);
+
+ if ((i + 1) % 2 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = null;
+
+ for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ {
+ for (int i = 0 ; i < 5; i++)
+ {
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
+ }
+
+ session.rollback();
+ }
+
+ pgStoreDLA.startPaging();
+ }
+
+ for (int i = 2; i < 100; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ session.start();
+
+ cons = session.createConsumer(ADDRESS);
+
+ for (int i = 2; i < 100; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+ }
+
+ cons.close();
+
+ cons = session.createConsumer("DLA");
+
+ for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ {
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
+ msg.acknowledge();
+ }
+
+ cons.close();
+
+ cons = session.createConsumer(ADDRESS);
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+
+ pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+
+ pgStoreAddress.getCursorProvier().cleanup();
+
+ while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+ {
+ Thread.sleep(50);
+ }
+
+ assertFalse(pgStoreAddress.isPaging());
+
+ session.commit();
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
}
+ public void testExpireLargeMessageOnPaging() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+ config.setMessageExpiryScanPeriod(500);
+ config.setJournalSyncNonTransactional(false);
+
+ Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+ AddressSettings dla = new AddressSettings();
+ dla.setMaxDeliveryAttempts(5);
+ dla.setDeadLetterAddress(new SimpleString("DLA"));
+ dla.setExpiryAddress(new SimpleString("DLA"));
+ settings.put(ADDRESS.toString(), dla);
+
+ final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+ server.start();
+
+ final int messageSize = 20;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.createQueue("DLA", "DLA");
+
+ PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+ pgStoreAddress.startPaging();
+ PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 500; i++)
+ {
+ if (i % 100 == 0) log.info("send message #" + i);
+ message = session.createMessage(true);
+
+ message.putStringProperty("id", "str" + i);
+
+ message.setExpiration(System.currentTimeMillis() + 2000);
+
+ if (i % 2 == 0)
+ {
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+ else
+ {
+ byte bytes[] = new byte[messageSize];
+ for (int s = 0 ; s < bytes.length; s++)
+ {
+ bytes[s] = getSamplebyte(s);
+ }
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+
+ producer.send(message);
+
+ if ((i + 1) % 2 == 0)
+ {
+ session.commit();
+ if (i < 400)
+ {
+ pgStoreAddress.forceAnotherPage();
+ }
+ }
+ }
+
+ session.commit();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ Thread.sleep(3000);
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ session.start();
+
+ ClientConsumer consAddr = session.createConsumer(ADDRESS);
+
+ assertNull(consAddr.receive(1000));
+
+
+ ClientConsumer cons = session.createConsumer("DLA");
+
+ for (int i = 0; i < 500; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ session.commit();
+
+ cons.close();
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+
+ while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+ {
+ Thread.sleep(50);
+ }
+
+ assertFalse(pgStoreAddress.isPaging());
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -29,6 +30,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
@@ -363,7 +365,126 @@
}
+
+ public void testTemoraryQueuesWithFilter() throws Exception
+ {
+
+ int countTmpQueue=0;
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ class MyHandler implements MessageHandler
+ {
+ final String color;
+
+ final CountDownLatch latch;
+
+ final ClientSession sess;
+
+ public MyHandler(ClientSession sess, String color, int expectedMessages)
+ {
+ this.sess = sess;
+ latch = new CountDownLatch(expectedMessages);
+ this.color = color;
+ }
+
+ public boolean waitCompletion() throws Exception
+ {
+ return latch.await(10, TimeUnit.SECONDS);
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ sess.commit();
+ latch.countDown();
+
+ if (!message.getStringProperty("color").equals(color))
+ {
+ log.warn("Unexpected color " + message.getStringProperty("color") + " when we were expecting " + color);
+ errors.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+
+ }
+
+ String address = "AD_test";
+ int iterations = 100;
+ int msgs = 100;
+
+ // Will be using a single Session as this is how an issue was raised
+ for (int i = 0 ; i < iterations; i++)
+ {
+ ClientSessionFactory clientsConnecton = locator.createSessionFactory();
+ ClientSession localSession = clientsConnecton.createSession();
+
+ ClientProducer prod = localSession.createProducer(address);
+
+ localSession.start();
+
+ log.info("Iteration " + i);
+ String queueRed = address + "_red_" + (countTmpQueue++);
+ String queueBlue = address + "_blue_" + (countTmpQueue++);
+
+ //ClientSession sessConsumerRed = clientsConnecton.createSession();
+ ClientSession sessConsumerRed = localSession;
+ sessConsumerRed.createTemporaryQueue(address, queueRed, "color='red'");
+ MyHandler redHandler = new MyHandler(sessConsumerRed, "red", msgs);
+ ClientConsumer redClientConsumer = sessConsumerRed.createConsumer(queueRed);
+ redClientConsumer.setMessageHandler(redHandler);
+ //sessConsumerRed.start();
+
+ //ClientSession sessConsumerBlue = clientsConnecton.createSession();
+ ClientSession sessConsumerBlue = localSession;
+ sessConsumerBlue.createTemporaryQueue(address, queueBlue, "color='blue'");
+ MyHandler blueHandler = new MyHandler(sessConsumerBlue, "blue", msgs);
+ ClientConsumer blueClientConsumer = sessConsumerBlue.createConsumer(queueBlue);
+ blueClientConsumer.setMessageHandler(blueHandler);
+ //sessConsumerBlue.start();
+
+ try
+ {
+ ClientMessage msgBlue = session.createMessage(false);
+ msgBlue.putStringProperty("color", "blue");
+
+ ClientMessage msgRed = session.createMessage(false);
+ msgRed.putStringProperty("color", "red");
+
+ for (int nmsg = 0; nmsg < msgs; nmsg++)
+ {
+ prod.send(msgBlue);
+
+ prod.send(msgRed);
+
+ session.commit();
+ }
+
+ blueHandler.waitCompletion();
+ redHandler.waitCompletion();
+
+ assertEquals(0, errors.get());
+
+ }
+ finally
+ {
+// sessConsumerRed.close();
+// sessConsumerBlue.close();
+ localSession.close();
+ clientsConnecton.close();
+ }
+ }
+
+ }
+
public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
{
session.close();
Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -674,10 +674,13 @@
{
}
- @Override
- public void copyTo(SequentialFile newFileName) throws Exception
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#copyTo(org.hornetq.core.journal.SequentialFile)
+ */
+ public void copyTo(SequentialFile newFileName)
{
- throw new UnsupportedOperationException();
+ // TODO Auto-generated method stub
+
}
}
Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.util;
Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.util;
@@ -541,8 +541,7 @@
}
catch (Exception e)
{
- System.out.println("port " + port + " is already bound");
- System.exit(0);
+ throw new IllegalStateException("port " + port + " is already bound");
}
finally
{
@@ -791,11 +790,11 @@
/** It validates a Bean (POJO) using simple setters and getters with random values.
* You can pass a list of properties to be ignored, as some properties will have a pre-defined domain (not being possible to use random-values on them) */
- protected void validateGettersAndSetters(final Object pojo, final String... IgnoredProperties) throws Exception
+ protected void validateGettersAndSetters(final Object pojo, final String... ignoredProperties) throws Exception
{
HashSet<String> ignoreSet = new HashSet<String>();
- for (String ignore : IgnoredProperties)
+ for (String ignore : ignoredProperties)
{
ignoreSet.add(ignore);
}
@@ -860,6 +859,8 @@
protected void setUp() throws Exception
{
super.setUp();
+
+ OperationContextImpl.clearContext();
deleteDirectory(new File(getTestDir()));
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-05-21 16:29:14 UTC (rev 10712)
@@ -143,12 +143,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -181,12 +180,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -219,12 +217,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -248,12 +245,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -325,12 +321,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -474,12 +469,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
new SimpleString("test"),
settings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -639,12 +633,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
new SimpleString("test"),
settings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl2.start();
@@ -726,12 +719,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
new SimpleString("test"),
settings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
true);
storeImpl.start();
@@ -770,12 +762,11 @@
100,
createMockManager(),
createStorageManagerMock(),
- createPostOfficeMock(),
factory,
storeFactory,
new SimpleString("test"),
settings,
- getExecutorFactory(),
+ getExecutorFactory().getExecutor(),
false);
storeImpl.start();
More information about the hornetq-commits
mailing list