[hornetq-commits] JBoss hornetq SVN: r10181 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Feb 4 18:25:01 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-04 18:25:01 -0500 (Fri, 04 Feb 2011)
New Revision: 10181
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on non-blocking paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.LinkedListIterator;
@@ -30,6 +31,8 @@
// Cursor query operations --------------------------------------
+ PagingStore getPagingStore();
+
// To be called before the server is down
void stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -49,7 +49,7 @@
public synchronized PagedMessage getPagedMessage()
{
- PagedMessage returnMessage = message.get();
+ PagedMessage returnMessage = message != null ? message.get() : null;
// We only keep a few references on the Queue from paging...
// Besides those references are SoftReferenced on page cache...
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -69,11 +69,11 @@
// Attributes ----------------------------------------------------
- private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
+ private final boolean isTrace = PageSubscriptionImpl.log.isTraceEnabled();
private static void trace(final String message)
{
- PageSubscriptionImpl.log.info(message);
+ PageSubscriptionImpl.log.trace(message);
}
private volatile boolean autoCleanup = true;
@@ -131,6 +131,11 @@
// Public --------------------------------------------------------
+ public PagingStore getPagingStore()
+ {
+ return pageStore;
+ }
+
public Queue getQueue()
{
return queue;
@@ -534,6 +539,7 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
+ deliveredCount.incrementAndGet();
installTXCallback(tx, position);
}
@@ -777,6 +783,8 @@
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
+
+ getPageInfo(position).remove(position);
PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -903,6 +911,7 @@
public void decrementPendingTX()
{
pendingTX.decrementAndGet();
+ checkDone();
}
public boolean isRemoved(final PagePosition pos)
@@ -928,17 +937,26 @@
", page = " +
pageId);
}
-
+
// Negative could mean a bookmark on the first element for the page (example -1)
if (posACK.getMessageNr() >= 0)
{
- if (getNumberOfMessages() == confirmed.incrementAndGet())
- {
- onPageDone(this);
- }
+ confirmed.incrementAndGet();
+ checkDone();
}
}
+ /**
+ *
+ */
+ protected void checkDone()
+ {
+ if (isDone())
+ {
+ onPageDone(this);
+ }
+ }
+
private int getNumberOfMessages()
{
if (wasLive)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -56,6 +56,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -1715,6 +1716,7 @@
if (sub != null)
{
sub.reloadPreparedACK(tx, encoding.position);
+ referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub));
}
else
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
@@ -103,6 +104,9 @@
// The quantity of pagedReferences on messageREferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
+
+ // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
+ private final AtomicInteger queueMemorySize = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -327,6 +331,7 @@
public synchronized void reload(final MessageReference ref)
{
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
if (!scheduledDeliveryHandler.checkAndSchedule(ref))
{
internalAddTail(ref);
@@ -375,6 +380,8 @@
{
return;
}
+
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -1214,22 +1221,18 @@
*/
private void internalAddTail(final MessageReference ref)
{
- if (ref.isPaged())
- {
- pagedReferences.incrementAndGet();
- }
+ refAdded(ref);
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
+
/**
* @param ref
*/
private void internalAddHead(final MessageReference ref)
{
- if (ref.isPaged())
- {
- pagedReferences.incrementAndGet();
- }
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
@@ -1392,12 +1395,25 @@
*/
private void refRemoved(MessageReference ref)
{
+ queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
if (ref.isPaged())
{
pagedReferences.decrementAndGet();
}
}
+ /**
+ * @param ref
+ */
+ protected void refAdded(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ }
+
+
private void scheduleDepage()
{
executor.execute(depageRunner);
@@ -1405,33 +1421,23 @@
private void depage()
{
- if (paused || consumerList.isEmpty())
+ if (paused || pageIterator == null || consumerList.isEmpty())
{
return;
}
- int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() + getScheduledCount() + concurrentQueue.size());
+ long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
- if (msgsToDeliver > 0)
+ //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+ while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
- //System.out.println("Depaging " + msgsToDeliver + " messages");
- //System.out.println("Depage " + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-
- int nmessages = 0;
- while (nmessages < msgsToDeliver && pageIterator.hasNext())
- {
- nmessages ++;
- addTail(pageIterator.next(), false);
- pageIterator.remove();
- }
-
- //System.out.println("Depaged " + nmessages);
+ PagedReference reference = pageIterator.next();
+ addTail(reference, false);
+ pageIterator.remove();
}
-// else
-// {
-// System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-// }
+ //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+
deliverAsync();
}
@@ -1737,13 +1743,26 @@
private void postAcknowledge(final MessageReference ref)
{
+ QueueImpl queue = (QueueImpl)ref.getQueue();
+
+ queue.deliveringCount.decrementAndGet();
+
+ if (queue.deliveringCount.get() < 0)
+ {
+ new Exception("DeliveringCount became negative").printStackTrace();
+ }
+
+ if (ref.isPaged())
+ {
+ // nothing to be done
+ return;
+ }
+
final ServerMessage message = ref.getMessage();
- QueueImpl queue = (QueueImpl)ref.getQueue();
-
boolean durableRef = message.isDurable() && queue.durable;
- if (durableRef && ! ref.isPaged())
+ if (durableRef)
{
int count = message.decrementDurableRefCount();
@@ -1775,13 +1794,6 @@
}
}
- queue.deliveringCount.decrementAndGet();
-
- if (queue.deliveringCount.get() < 0)
- {
- new Exception("DeliveringCount became negative").printStackTrace();
- }
-
try
{
message.decrementRefCount();
@@ -1827,10 +1839,19 @@
private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+ List<ServerMessage> pagedMessagesToPostACK = null;
synchronized void addAck(final MessageReference ref)
{
refsToAck.add(ref);
+ if (ref.isPaged())
+ {
+ if (pagedMessagesToPostACK == null)
+ {
+ pagedMessagesToPostACK = new ArrayList<ServerMessage>();
+ }
+ pagedMessagesToPostACK.add(ref.getMessage());
+ }
}
public void beforeCommit(final Transaction tx) throws Exception
@@ -1891,6 +1912,21 @@
postAcknowledge(ref);
}
}
+
+ if (pagedMessagesToPostACK != null)
+ {
+ for (ServerMessage msg : pagedMessagesToPostACK)
+ {
+ try
+ {
+ msg.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
}
public void beforePrepare(final Transaction tx) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -112,11 +112,19 @@
super.tearDown();
}
+ public void testRepeat() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println(" ####################### test " + i);
+ testPreparePersistent();
+ tearDown();
+ setUp();
+ }
+ }
+
public void testPreparePersistent() throws Exception
{
- boolean persistentMessages = true;
-
- System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
@@ -133,8 +141,12 @@
final int messageSize = 1024;
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 5000;
+ final int numberOfTX = 10;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
try
{
ServerLocator locator = createInVMNonHALocator();
@@ -164,7 +176,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- message = session.createMessage(persistentMessages);
+ message = session.createMessage(true);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -204,7 +216,7 @@
LinkedList<Xid> xids = new LinkedList<Xid>();
int msgReceived = 0;
- for (int i = 0; i < numberOfMessages / 999; i++)
+ for (int i = 0; i < numberOfTX; i++)
{
ClientSession sessionConsumer = sf.createSession(true, false, false);
Xid xid = newXID();
@@ -212,14 +224,14 @@
sessionConsumer.start(xid, XAResource.TMNOFLAGS);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
- for (int msgCount = 0; msgCount < 1000; i++)
+ for (int msgCount = 0; msgCount < messagesPerTX; msgCount++)
{
if (msgReceived == numberOfMessages)
{
break;
}
- System.out.println("MsgReceived = " + (msgReceived++));
- ClientMessage msg = consumer.receive(5000);
+ msgReceived++;
+ ClientMessage msg = consumer.receive(10000);
assertNotNull(msg);
msg.acknowledge();
}
@@ -236,8 +248,6 @@
sessionCheck.close();
- System.out.println(queue.getMessagesAdded());
-
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -262,25 +272,75 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
+
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
- assertNull(consumer.receiveImmediate());
+ ClientMessage msg = consumer.receive(5000);
+ if (msg != null)
+ {
+ System.out.println("Msg " + msg.getIntProperty("id"));
- for (Xid xid : xids)
+ while (true)
+ {
+ ClientMessage msg2 = consumer.receive(1000);
+ if (msg2 == null)
+ {
+ break;
+ }
+ System.out.println("Msg received again : " + msg2.getIntProperty("id"));
+
+ }
+ }
+ assertNull(msg);
+
+ for (int i = xids.size() -1 ; i >= 0; i--)
{
+ Xid xid = xids.get(i);
session.rollback(xid);
}
+ System.out.println("msgCount = " + queue.getMessageCount());
xids.clear();
- assertNotNull(consumer.receiveImmediate());
+ session.close();
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertEquals(i, msg.getIntProperty("id").intValue());
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
session.close();
sf.close();
locator.close();
- queue.getMessageCount();
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
// assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
@@ -331,7 +391,10 @@
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+ session.createQueue(PagingTest.ADDRESS,
+ PagingTest.ADDRESS.concat("-invalid"),
+ new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER),
+ true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -374,16 +437,16 @@
session.commit();
}
}
-
+
session.commit();
-
+
session.commit();
session.commit();
-
+
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
store.getCursorProvier().cleanup();
-
+
long timeout = System.currentTimeMillis() + 5000;
while (store.isPaging() && timeout > System.currentTimeMillis())
{
@@ -402,7 +465,7 @@
try
{
server.stop();
- // System.exit(-1);
+ // System.exit(-1);
}
catch (Throwable ignored)
{
@@ -1792,6 +1855,10 @@
for (int i = 0; i < numberOfMessages; i++)
{
System.out.println("Received " + i);
+ if (i == 55)
+ {
+ System.out.println("i = 55");
+ }
ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
More information about the hornetq-commits
mailing list