Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 14:27:38 -0500 (Tue, 09 Nov 2010)
New Revision: 9858
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
commit before a small refactoring
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -39,6 +39,8 @@
PageCache getPageCache(PagePosition pos);
+ PagedReference newReference(final PagePosition pos, final PagedMessage msg);
+
void addPageCache(PageCache cache);
PagingStore getAssociatedStore();
@@ -52,7 +54,7 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws
Exception;
+ PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A InternalReference
@@ -33,7 +34,11 @@
private PagePosition a;
private PagedMessage b;
+ private Queue queue;
+ private PageSubscription subscription;
+
+
public ServerMessage getMessage()
{
return b.getMessage();
@@ -140,4 +145,22 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge()
+ */
+ public void acknowledge() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -30,6 +30,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
@@ -122,12 +123,12 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition
cursorPos) throws Exception
+ public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos)
throws Exception
{
while (true)
{
- PagedReferenceImpl retPos = internalGetNext(cursorPos);
+ PagedReference retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -182,7 +183,7 @@
return false;
}
- private PagedReferenceImpl internalGetNext(final PagePosition pos)
+ private PagedReference internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -209,7 +210,7 @@
if (serverMessage != null)
{
- return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
+ return newReference(retPos, serverMessage);
}
else
{
@@ -229,6 +230,11 @@
return cache.getMessage(pos.getMessageNr());
}
+
+ public PagedReference newReference(final PagePosition pos, final PagedMessage msg)
+ {
+ return new PagedReferenceImpl(pos, msg);
+ }
/**
* No need to synchronize this method since the private getPageCache will have a
synchronized call
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -177,7 +177,7 @@
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
- PagedReferenceImpl cachedNext;
+ PagedReference cachedNext;
public void repeat()
{
@@ -201,13 +201,14 @@
/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
- public synchronized PagedReferenceImpl next()
+ public synchronized PagedReference next()
{
if (cachedNext != null)
{
- PagedReferenceImpl retPos = cachedNext;
+ PagedReference retPos = cachedNext;
cachedNext = null;
+ System.out.println("Returning cached next " + retPos);
return retPos;
}
@@ -215,8 +216,9 @@
{
if (redeliveryIterator.hasNext())
{
+ // There's a redelivery pending, we will get it out of that pool
instead
isredelivery = true;
- return getMessage(redeliveryIterator.next());
+ return getReference(redeliveryIterator.next());
}
else
{
@@ -228,7 +230,7 @@
position = getStartPosition();
}
- PagedReferenceImpl nextPos = moveNext(position);
+ PagedReference nextPos = moveNext(position);
if (nextPos != null)
{
lastOperation = position;
@@ -278,9 +280,9 @@
}
}
- private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
+ private PagedReference getReference(PagePosition pos) throws Exception
{
- return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
+ return cursorProvider.newReference(pos, cursorProvider.getMessage(pos));
}
/* (non-Javadoc)
@@ -294,11 +296,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized PagedReferenceImpl moveNext(PagePosition position) throws
Exception
+ public synchronized PagedReference moveNext(PagePosition position) throws Exception
{
boolean match = false;
- PagedReferenceImpl message = null;
+ PagedReference message = null;
PagePosition tmpPosition = position;
@@ -307,12 +309,16 @@
message = cursorProvider.getNext(this, tmpPosition);
boolean valid = true;
+
if (message == null)
{
valid = false;
}
else
{
+ // We don't create a PageCursorInfo unless we are doing a write operation
(ack or removing)
+ // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
+ // is being changed. That's why the false is passed as a parameter here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
{
@@ -847,6 +853,11 @@
// Inner classes -------------------------------------------------
+ /**
+ * This will hold information about the pending ACKs towards a page.
+ * This instance will be released as soon as the entire page is consumed, releasing
the memory at that point
+ * The ref counts are increased also when a message is ignored for any reason.
+ * */
private class PageCursorInfo
{
// Number of messages existent on this page
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -13,6 +13,8 @@
package org.hornetq.core.server;
+import org.hornetq.core.transaction.Transaction;
+
/**
* A reference to a message.
*
@@ -51,6 +53,11 @@
void decrementDeliveryCount();
Queue getQueue();
+
+ void acknowledge() throws Exception;
+
+ void acknowledge(final Transaction tx) throws Exception;
+
void handled();
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -29,6 +29,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
/**
* A queue that will discard messages if a newer message with the same
MessageImpl.HDR_LAST_VALUE_NAME property value.
@@ -92,7 +93,7 @@
try
{
- super.acknowledge(oldRef);
+ oldRef.acknowledge();
}
catch (Exception e)
{
@@ -233,5 +234,21 @@
{
return false;
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge() throws Exception
+ {
+ ref.acknowledge();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction,
org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ ref.acknowledge(tx);
+ }
}
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.MemorySize;
/**
@@ -150,6 +151,23 @@
return false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge() throws Exception
+ {
+ queue.acknowledge(this);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction,
org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ queue.acknowledge(tx, this);
+ }
+
+
// Public --------------------------------------------------------
@Override
@@ -159,7 +177,6 @@
"]:" +
(getMessage().isDurable() ? "RELIABLE" :
"NON-RELIABLE");
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -675,32 +675,46 @@
public void acknowledge(final MessageReference ref) throws Exception
{
- ServerMessage message = ref.getMessage();
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
+ if (ref.isPaged())
{
- storageManager.storeAcknowledge(id, message.getMessageID());
+ pageSubscription.ack((PagedReference)ref);
}
+ else
+ {
+ ServerMessage message = ref.getMessage();
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ storageManager.storeAcknowledge(id, message.getMessageID());
+ }
+ postAcknowledge(ref);
+ }
- postAcknowledge(ref);
}
public void acknowledge(final Transaction tx, final MessageReference ref) throws
Exception
{
- ServerMessage message = ref.getMessage();
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
+ if (ref.isPaged())
{
- storageManager.storeAcknowledgeTransactional(tx.getID(), id,
message.getMessageID());
-
- tx.setContainsPersistent();
+ pageSubscription.ackTx(tx, (PagedReference)ref);
}
-
- getRefsOperation(tx).addAck(ref);
+ else
+ {
+ ServerMessage message = ref.getMessage();
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ storageManager.storeAcknowledgeTransactional(tx.getID(), id,
message.getMessageID());
+
+ tx.setContainsPersistent();
+ }
+
+ getRefsOperation(tx).addAck(ref);
+ }
}
public void reacknowledge(final Transaction tx, final MessageReference ref) throws
Exception
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -521,11 +521,11 @@
if (autoCommitAcks || tx == null)
{
- ref.getQueue().acknowledge(ref);
+ ref.acknowledge();
}
else
{
- ref.getQueue().acknowledge(tx, ref);
+ ref.acknowledge(tx);
}
}
while (ref.getMessage().getMessageID() != messageID);
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09
04:23:26 UTC (rev 9857)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09
19:27:38 UTC (rev 9858)
@@ -315,6 +315,8 @@
private void internaltestSendReceivePaging(final boolean persistentMessages) throws
Exception
{
+
+ System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();