Author: clebert.suconic(a)jboss.com
Date: 2011-04-18 23:31:49 -0400 (Mon, 18 Apr 2011)
New Revision: 10528
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-6237 - Fixing Large Message over Live Cache
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-18
16:52:01 UTC (rev 10527)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-19
03:31:49 UTC (rev 10528)
@@ -19,6 +19,7 @@
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.server.LargeServerMessage;
/**
* This is the same as PageCache, however this is for the page that's being currently
written.
@@ -132,6 +133,10 @@
*/
public synchronized void addLiveMessage(PagedMessage message)
{
+ if (message.getMessage().isLargeMessage())
+ {
+ ((LargeServerMessage)message.getMessage()).incrementDelayDeletionCount();
+ }
this.messages.add(message);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-04-18
16:52:01 UTC (rev 10527)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-04-19
03:31:49 UTC (rev 10528)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -231,8 +232,8 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- // PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
- // " now since it's the current page");
+ PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
+ " now since it's the current page");
}
else
{
@@ -846,7 +847,7 @@
private final long pageId;
// Confirmed ACKs on this page
- private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
+ private final Set<PagePosition> acks = Collections.synchronizedSet(new
LinkedHashSet<PagePosition>());
private WeakReference<PageCache> cache;
@@ -934,8 +935,6 @@
public void addACK(final PagePosition posACK)
{
- removedReferences.add(posACK);
- acks.add(posACK);
if (isTrace)
{
@@ -944,11 +943,14 @@
(confirmed.get() + 1) +
" pendingTX = " + pendingTX +
", page = " +
- pageId);
+ pageId + " posACK = " + posACK);
}
+ removedReferences.add(posACK);
+ boolean added = acks.add(posACK);
+
// Negative could mean a bookmark on the first element for the page (example
-1)
- if (posACK.getMessageNr() >= 0)
+ if (added && posACK.getMessageNr() >= 0)
{
confirmed.incrementAndGet();
checkDone();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-18
16:52:01 UTC (rev 10527)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-19
03:31:49 UTC (rev 10528)
@@ -2450,6 +2450,7 @@
}
}
+ // JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues() throws Exception
{
Configuration config = createDefaultConfig(isNetty());
@@ -2602,6 +2603,144 @@
}
+
+ // JBPAPP-6237
+ public void testPageOnLargeMessageMultipleQueues2() throws Exception
+ {
+ Configuration config = createDefaultConfig(isNetty());
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ HashMap<String, AddressSettings> map = new HashMap<String,
AddressSettings>();
+
+ AddressSettings value = new AddressSettings();
+ map.put(LargeMessageTest.ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setCompressLargeMessage(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ int msgId = 0;
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("msgID", msgId++);
+
+ message.putBooleanProperty("TestLarge", false);
+
+ message.getBodyBuffer().writerIndex(0);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
+ }
+
+ producer.send(message);
+ }
+
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+ }
+
+ session.close();
+
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+ session.start();
+
+ for (int received = 0 ; received < 5; received++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ assertFalse(message2.getBooleanProperty("TestLarge"));
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(messageLarge);
+
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+ messageLarge.acknowledge();
+
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
+ }
+ }
+
+ session.rollback();
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ }
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testSendStreamingSingleMessage() throws Exception
{
ClientSession session = null;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-04-18
16:52:01 UTC (rev 10527)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-04-19
03:31:49 UTC (rev 10528)
@@ -2934,7 +2934,260 @@
}
+ public void testTwoQueuesDifferentFilters() throws Exception
+ {
+ boolean persistentMessages = true;
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 200;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ // note: if you want to change this, numberOfMessages has to be a multiple of
NQUEUES
+ int NQUEUES = 2;
+
+
+ for (int i = 0 ; i < NQUEUES; i++)
+ {
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i),
true);
+ }
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % NQUEUES);
+ message.putIntProperty("id", i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
+ {
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
+
+ for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(nqueue,
message.getIntProperty("propTest").intValue());
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ session.commit();
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
+ public void testTwoQueues() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ for (int msg = 1; msg <= 2; msg++)
+ {
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ //assertEquals(msg,
message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+ store.getCursorProvier().cleanup();
+
+ Thread.sleep(1000);
+
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------