JBoss hornetq SVN: r9863 - trunk/examples/common.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2010-11-09 21:27:37 -0500 (Tue, 09 Nov 2010)
New Revision: 9863
Modified:
trunk/examples/common/build.xml
Log:
added default value of client.args property to avoid build error on the examples which don't use client.args
Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml 2010-11-09 23:58:11 UTC (rev 9862)
+++ trunk/examples/common/build.xml 2010-11-10 02:27:37 UTC (rev 9863)
@@ -55,6 +55,7 @@
<property name="classes.dir" value="${build.dir}/classes"/>
<property name="config.dir" value="${imported.basedir}/config"/>
<property name="example.config.dir" value="config"/>
+ <property name="client.args" value=""/>
<path id="extra.classpath">
</path>
@@ -140,6 +141,7 @@
<!--<echo>client classpath = ${clientClasspath}</echo>-->
<property file="${imported.basedir}/config/server.properties"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
+ <jvmarg line="${client.args}"/>
<jvmarg value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
<jvmarg value="-Dhornetq.example.server.args=${server.args}"/>
<jvmarg value="-Dhornetq.example.logserveroutput=${hornetq.example.logserveroutput}"/>
15 years, 6 months
JBoss hornetq SVN: r9862 - in branches/Branch_New_Paging/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 18:58:11 -0500 (Tue, 09 Nov 2010)
New Revision: 9862
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 23:38:48 UTC (rev 9861)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 23:58:11 UTC (rev 9862)
@@ -51,6 +51,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -988,7 +989,7 @@
return ids;
}
- private long getTransactionID(RoutingContext ctx)
+ private long getTransactionID(RoutingContext ctx) throws Exception
{
Transaction tx = ctx.getTransaction();
if (tx == null)
@@ -997,10 +998,68 @@
}
else
{
+ if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+ {
+ PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ System.out.println("Creating pageTransaction " + pgTX.getTransactionID());
+ storageManager.storePageTransaction(tx.getID(), pgTX);
+ pagingManager.addTransaction(pgTX);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
+ tx.addOperation(new FinishPageMessageOperation());
+
+ tx.setContainsPersistent();
+ }
+
return tx.getID();
}
}
+
+ private static class FinishPageMessageOperation implements TransactionOperation
+ {
+
+ public void afterCommit(final Transaction tx)
+ {
+ // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+ // transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction != null)
+ {
+ pageTransaction.commit();
+ }
+ }
+
+ public void afterPrepare(final Transaction tx)
+ {
+ }
+
+ public void afterRollback(final Transaction tx)
+ {
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (tx.getState() == State.PREPARED && pageTransaction != null)
+ {
+ pageTransaction.rollback();
+ }
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ }
+
/**
* This method will remove files from the page system and and route them, doing it transactionally
*
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09 23:38:48 UTC (rev 9861)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09 23:58:11 UTC (rev 9862)
@@ -1267,13 +1267,20 @@
{
return;
}
-
- int nmessages = 0;
- while (nmessages < MAX_DELIVERIES_IN_LOOP && pageIterator.hasNext())
+
+ int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() + getScheduledCount() + concurrentQueue.size());
+
+ if (msgsToDeliver > 0)
{
- nmessages ++;
- addTail(pageIterator.next(), false);
- pageIterator.remove();
+ System.out.println("Depaging " + msgsToDeliver + " messages");
+
+ int nmessages = 0;
+ while (nmessages < msgsToDeliver && pageIterator.hasNext())
+ {
+ nmessages ++;
+ addTail(pageIterator.next(), false);
+ pageIterator.remove();
+ }
}
deliverAsync();
15 years, 6 months
JBoss hornetq SVN: r9861 - trunk/examples/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 18:38:48 -0500 (Tue, 09 Nov 2010)
New Revision: 9861
Modified:
trunk/examples/common/build.xml
Log:
Fixing examples
Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml 2010-11-09 22:59:00 UTC (rev 9860)
+++ trunk/examples/common/build.xml 2010-11-09 23:38:48 UTC (rev 9861)
@@ -140,7 +140,6 @@
<!--<echo>client classpath = ${clientClasspath}</echo>-->
<property file="${imported.basedir}/config/server.properties"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
- <jvmarg line="${client.args}"/>
<jvmarg value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
<jvmarg value="-Dhornetq.example.server.args=${server.args}"/>
<jvmarg value="-Dhornetq.example.logserveroutput=${hornetq.example.logserveroutput}"/>
15 years, 6 months
JBoss hornetq SVN: r9860 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 17:59:00 -0500 (Tue, 09 Nov 2010)
New Revision: 9860
Modified:
trunk/docs/user-manual/en/configuring-transports.xml
trunk/docs/user-manual/en/using-core.xml
Log:
HORNETQ-565 - fixing typos on the documentation
Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml 2010-11-09 19:42:52 UTC (rev 9859)
+++ trunk/docs/user-manual/en/configuring-transports.xml 2010-11-09 22:59:00 UTC (rev 9860)
@@ -96,7 +96,7 @@
<literal>HornetQConnectionFactory</literal> it needs to know what server
that connection factory will create connections to.</para>
<para>That's defined by the <literal>connector-ref</literal> element in the <literal
- >hornetq-jms.xml</literal>file on the server side. Let's take a look at a
+ >hornetq-jms.xml</literal> file on the server side. Let's take a look at a
snipped from a <literal>hornetq-jms.xml</literal> file that shows a JMS
connection factory that references our netty connector defined in our <literal
>hornetq-configuration.xml</literal> file:</para>
Modified: trunk/docs/user-manual/en/using-core.xml
===================================================================
--- trunk/docs/user-manual/en/using-core.xml 2010-11-09 19:42:52 UTC (rev 9859)
+++ trunk/docs/user-manual/en/using-core.xml 2010-11-09 22:59:00 UTC (rev 9860)
@@ -94,7 +94,7 @@
<para>In core, there is no concept of a Topic, Topic is a JMS only term. Instead, in
core, we just deal with <emphasis>addresses</emphasis> and
<emphasis>queues</emphasis>.</para>
- <para>For example, a JMS topic would implemented by a single address to which many
+ <para>For example, a JMS topic would be implemented by a single address to which many
queues are bound. Each queue represents a subscription of the topic. A JMS Queue
would be implemented as a single address to which one queue is bound - that
queue represents the JMS queue.</para>
15 years, 6 months
JBoss hornetq SVN: r9859 - in branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor: impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 14:42:52 -0500 (Tue, 09 Nov 2010)
New Revision: 9859
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
tweak
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09 19:27:38 UTC (rev 9858)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09 19:42:52 UTC (rev 9859)
@@ -39,7 +39,7 @@
PageCache getPageCache(PagePosition pos);
- PagedReference newReference(final PagePosition pos, final PagedMessage msg);
+ PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
void addPageCache(PageCache cache);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09 19:42:52 UTC (rev 9859)
@@ -28,38 +28,37 @@
*/
public class PagedReferenceImpl implements PagedReference
{
-
+
private static final long serialVersionUID = -8640232251318264710L;
- private PagePosition a;
- private PagedMessage b;
+ private final PagePosition position;
+
+ private final PagedMessage message;
- private Queue queue;
-
- private PageSubscription subscription;
-
-
+ private final PageSubscription subscription;
+
public ServerMessage getMessage()
{
- return b.getMessage();
+ return message.getMessage();
}
-
+
public PagedMessage getPagedMessage()
{
- return b;
+ return message;
}
-
+
public PagePosition getPosition()
{
- return a;
+ return position;
}
- public PagedReferenceImpl(PagePosition a, PagedMessage b)
+ public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
{
- this.a = a;
- this.b = b;
+ this.position = position;
+ this.message = message;
+ this.subscription = subscription;
}
-
+
public boolean isPaged()
{
return true;
@@ -68,7 +67,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
*/
- public MessageReference copy(Queue queue)
+ public MessageReference copy(final Queue queue)
{
// TODO Auto-generated method stub
return null;
@@ -86,10 +85,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#setScheduledDeliveryTime(long)
*/
- public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -104,10 +103,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#setDeliveryCount(int)
*/
- public void setDeliveryCount(int deliveryCount)
+ public void setDeliveryCount(final int deliveryCount)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -116,7 +115,7 @@
public void incrementDeliveryCount()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -125,7 +124,7 @@
public void decrementDeliveryCount()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -133,8 +132,7 @@
*/
public Queue getQueue()
{
- // TODO Auto-generated method stub
- return null;
+ return subscription.getQueue();
}
/* (non-Javadoc)
@@ -142,8 +140,7 @@
*/
public void handled()
{
- // TODO Auto-generated method stub
-
+ getQueue().referenceHandled();
}
/* (non-Javadoc)
@@ -151,16 +148,14 @@
*/
public void acknowledge() throws Exception
{
- // TODO Auto-generated method stub
-
+ subscription.ack(this);
}
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction)
*/
- public void acknowledge(Transaction tx) throws Exception
+ public void acknowledge(final Transaction tx) throws Exception
{
- // TODO Auto-generated method stub
-
+ subscription.ackTx(tx, this);
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09 19:42:52 UTC (rev 9859)
@@ -128,7 +128,7 @@
while (true)
{
- PagedReference retPos = internalGetNext(cursorPos);
+ PagedReference retPos = internalGetNext(cursorPos, cursor);
if (retPos == null)
{
@@ -183,7 +183,7 @@
return false;
}
- private PagedReference internalGetNext(final PagePosition pos)
+ private PagedReference internalGetNext(final PagePosition pos, final PageSubscription sub)
{
PagePosition retPos = pos.nextMessage();
@@ -210,7 +210,7 @@
if (serverMessage != null)
{
- return newReference(retPos, serverMessage);
+ return newReference(retPos, serverMessage, sub);
}
else
{
@@ -231,9 +231,9 @@
return cache.getMessage(pos.getMessageNr());
}
- public PagedReference newReference(final PagePosition pos, final PagedMessage msg)
+ public PagedReference newReference(final PagePosition pos, final PagedMessage msg, final PageSubscription subscription)
{
- return new PagedReferenceImpl(pos, msg);
+ return new PagedReferenceImpl(pos, msg, subscription);
}
/**
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09 19:42:52 UTC (rev 9859)
@@ -282,7 +282,7 @@
private PagedReference getReference(PagePosition pos) throws Exception
{
- return cursorProvider.newReference(pos, cursorProvider.getMessage(pos));
+ return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
}
/* (non-Javadoc)
15 years, 6 months
JBoss hornetq SVN: r9858 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 14:27:38 -0500 (Tue, 09 Nov 2010)
New Revision: 9858
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
commit before a small refactoring
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -39,6 +39,8 @@
PageCache getPageCache(PagePosition pos);
+ PagedReference newReference(final PagePosition pos, final PagedMessage msg);
+
void addPageCache(PageCache cache);
PagingStore getAssociatedStore();
@@ -52,7 +54,7 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws Exception;
+ PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A InternalReference
@@ -33,7 +34,11 @@
private PagePosition a;
private PagedMessage b;
+ private Queue queue;
+ private PageSubscription subscription;
+
+
public ServerMessage getMessage()
{
return b.getMessage();
@@ -140,4 +145,22 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge()
+ */
+ public void acknowledge() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -30,6 +30,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
@@ -122,12 +123,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
+ public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
{
while (true)
{
- PagedReferenceImpl retPos = internalGetNext(cursorPos);
+ PagedReference retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -182,7 +183,7 @@
return false;
}
- private PagedReferenceImpl internalGetNext(final PagePosition pos)
+ private PagedReference internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -209,7 +210,7 @@
if (serverMessage != null)
{
- return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
+ return newReference(retPos, serverMessage);
}
else
{
@@ -229,6 +230,11 @@
return cache.getMessage(pos.getMessageNr());
}
+
+ public PagedReference newReference(final PagePosition pos, final PagedMessage msg)
+ {
+ return new PagedReferenceImpl(pos, msg);
+ }
/**
* No need to synchronize this method since the private getPageCache will have a synchronized call
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -177,7 +177,7 @@
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
- PagedReferenceImpl cachedNext;
+ PagedReference cachedNext;
public void repeat()
{
@@ -201,13 +201,14 @@
/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
- public synchronized PagedReferenceImpl next()
+ public synchronized PagedReference next()
{
if (cachedNext != null)
{
- PagedReferenceImpl retPos = cachedNext;
+ PagedReference retPos = cachedNext;
cachedNext = null;
+ System.out.println("Returning cached next " + retPos);
return retPos;
}
@@ -215,8 +216,9 @@
{
if (redeliveryIterator.hasNext())
{
+ // There's a redelivery pending, we will get it out of that pool instead
isredelivery = true;
- return getMessage(redeliveryIterator.next());
+ return getReference(redeliveryIterator.next());
}
else
{
@@ -228,7 +230,7 @@
position = getStartPosition();
}
- PagedReferenceImpl nextPos = moveNext(position);
+ PagedReference nextPos = moveNext(position);
if (nextPos != null)
{
lastOperation = position;
@@ -278,9 +280,9 @@
}
}
- private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
+ private PagedReference getReference(PagePosition pos) throws Exception
{
- return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
+ return cursorProvider.newReference(pos, cursorProvider.getMessage(pos));
}
/* (non-Javadoc)
@@ -294,11 +296,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized PagedReferenceImpl moveNext(PagePosition position) throws Exception
+ public synchronized PagedReference moveNext(PagePosition position) throws Exception
{
boolean match = false;
- PagedReferenceImpl message = null;
+ PagedReference message = null;
PagePosition tmpPosition = position;
@@ -307,12 +309,16 @@
message = cursorProvider.getNext(this, tmpPosition);
boolean valid = true;
+
if (message == null)
{
valid = false;
}
else
{
+ // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
+ // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+ // is being changed. That's why the false is passed as a parameter here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
{
@@ -847,6 +853,11 @@
// Inner classes -------------------------------------------------
+ /**
+ * This will hold information about the pending ACKs towards a page.
+ * This instance will be released as soon as the entire page is consumed, releasing the memory at that point
+ * The ref counts are increased also when a message is ignored for any reason.
+ * */
private class PageCursorInfo
{
// Number of messages existent on this page
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -13,6 +13,8 @@
package org.hornetq.core.server;
+import org.hornetq.core.transaction.Transaction;
+
/**
* A reference to a message.
*
@@ -51,6 +53,11 @@
void decrementDeliveryCount();
Queue getQueue();
+
+ void acknowledge() throws Exception;
+
+ void acknowledge(final Transaction tx) throws Exception;
+
void handled();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -29,6 +29,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
/**
* A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
@@ -92,7 +93,7 @@
try
{
- super.acknowledge(oldRef);
+ oldRef.acknowledge();
}
catch (Exception e)
{
@@ -233,5 +234,21 @@
{
return false;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge() throws Exception
+ {
+ ref.acknowledge();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ ref.acknowledge(tx);
+ }
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.MemorySize;
/**
@@ -150,6 +151,23 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge() throws Exception
+ {
+ queue.acknowledge(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(Transaction tx) throws Exception
+ {
+ queue.acknowledge(tx, this);
+ }
+
+
// Public --------------------------------------------------------
@Override
@@ -159,7 +177,6 @@
"]:" +
(getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE");
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -675,32 +675,46 @@
public void acknowledge(final MessageReference ref) throws Exception
{
- ServerMessage message = ref.getMessage();
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
+ if (ref.isPaged())
{
- storageManager.storeAcknowledge(id, message.getMessageID());
+ pageSubscription.ack((PagedReference)ref);
}
+ else
+ {
+ ServerMessage message = ref.getMessage();
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ storageManager.storeAcknowledge(id, message.getMessageID());
+ }
+ postAcknowledge(ref);
+ }
- postAcknowledge(ref);
}
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception
{
- ServerMessage message = ref.getMessage();
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
+ if (ref.isPaged())
{
- storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-
- tx.setContainsPersistent();
+ pageSubscription.ackTx(tx, (PagedReference)ref);
}
-
- getRefsOperation(tx).addAck(ref);
+ else
+ {
+ ServerMessage message = ref.getMessage();
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
+
+ tx.setContainsPersistent();
+ }
+
+ getRefsOperation(tx).addAck(ref);
+ }
}
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -521,11 +521,11 @@
if (autoCommitAcks || tx == null)
{
- ref.getQueue().acknowledge(ref);
+ ref.acknowledge();
}
else
{
- ref.getQueue().acknowledge(tx, ref);
+ ref.acknowledge(tx);
}
}
while (ref.getMessage().getMessageID() != messageID);
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09 19:27:38 UTC (rev 9858)
@@ -315,6 +315,8 @@
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
+
+ System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
15 years, 6 months
JBoss hornetq SVN: r9857 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 23:23:26 -0500 (Mon, 08 Nov 2010)
New Revision: 9857
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
fix
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 04:14:09 UTC (rev 9856)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 04:23:26 UTC (rev 9857)
@@ -972,7 +972,7 @@
private long[] getQueueIDs(RouteContextList ctx)
{
List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
- List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues();
+ List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
15 years, 6 months
JBoss hornetq SVN: r9856 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 23:14:09 -0500 (Mon, 08 Nov 2010)
New Revision: 9856
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -60,6 +61,8 @@
boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
+ boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx) throws Exception;
+
Page createPage(final int page) throws Exception;
PagingManager getPagingManager();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -45,6 +45,7 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -303,12 +304,17 @@
{
return storeName;
}
-
+
public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
{
+ return page(message, ctx, ctx.getContextListing(storeName));
+ }
+
+ public boolean page(final ServerMessage message, final RoutingContext ctx, RouteContextList listCtx) throws Exception
+ {
// The sync on transactions is done on commit only
// TODO: sync on paging
- return page(message, ctx, false);
+ return page(message, ctx, listCtx, false);
}
public void sync() throws Exception
@@ -875,7 +881,7 @@
}
- protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean sync) throws Exception
+ protected boolean page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync) throws Exception
{
if (!running)
{
@@ -942,7 +948,7 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx), getTransactionID(ctx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(ctx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -963,10 +969,10 @@
}
- private long[] getQueueIDs(RoutingContext ctx)
+ private long[] getQueueIDs(RouteContextList ctx)
{
- List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues(address);
- List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues(address);
+ List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
+ List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues();
long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -47,6 +47,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
@@ -840,12 +841,17 @@
Transaction tx = context.getTransaction();
- for (SimpleString add: context.getAddresses())
+
+ for (Map.Entry<SimpleString, RouteContextList> entry: context.getContexListing().entrySet())
{
+ PagingStore store = pagingManager.getPageStore(entry.getKey());
- PagingStore store = pagingManager.getPageStore(add);
+ if (store.page(message, context, entry.getValue()))
+ {
+ continue;
+ }
- for (Queue queue : context.getNonDurableQueues(add))
+ for (Queue queue : entry.getValue().getNonDurableQueues())
{
MessageReference reference = message.createReference(queue);
@@ -861,7 +867,7 @@
message.incrementRefCount();
}
- Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+ Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
while (iter.hasNext())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -14,6 +14,7 @@
package org.hornetq.core.server;
import java.util.List;
+import java.util.Map;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -35,8 +36,10 @@
void addQueue(SimpleString address, Queue queue);
- Pair<SimpleString, RouteContextList> getContexListing();
+ Map<SimpleString, RouteContextList> getContexListing();
+ RouteContextList getContextListing(SimpleString address);
+
List<Queue> getNonDurableQueues(SimpleString address);
List<Queue> getDurableQueues(SimpleString address);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -199,12 +200,14 @@
buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
+
+ List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
- if (!context.getDurableQueues().contains(storeAndForwardQueue))
+ if (!durableQueuesOnContext.contains(storeAndForwardQueue))
{
// There can be many remote bindings for the same node, we only want to add the message once to
// the s & f queue for that node
- context.addQueue(address, storeAndForwardQueue);
+ durableQueuesOnContext.add(storeAndForwardQueue);
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -37,7 +37,7 @@
{
// The pair here is Durable and NonDurable
- private Map<SimpleString, ContextListing> map = new HashMap<SimpleString, ContextListing>();
+ private Map<SimpleString, RouteContextList> map = new HashMap<SimpleString, RouteContextList>();
private Transaction transaction;
@@ -60,23 +60,23 @@
public void addQueue(final SimpleString address, final Queue queue)
{
- ContextListing listing = getContextListing(address);
+ RouteContextList listing = getContextListing(address);
if (queue.isDurable())
{
- listing.durableQueues.add(queue);
+ listing.getDurableQueues().add(queue);
}
else
{
- listing.durableQueues.add(queue);
+ listing.getNonDurableQueues().add(queue);
}
queueCount++;
}
- private ContextListing getContextListing(SimpleString address)
+ public RouteContextList getContextListing(SimpleString address)
{
- ContextListing listing = map.get(address);
+ RouteContextList listing = map.get(address);
if (listing == null)
{
listing = new ContextListing();
@@ -97,12 +97,12 @@
public List<Queue> getNonDurableQueues(SimpleString address)
{
- return getContextListing(address).nonDurableQueues;
+ return getContextListing(address).getNonDurableQueues();
}
public List<Queue> getDurableQueues(SimpleString address)
{
- return getContextListing(address).durableQueues;
+ return getContextListing(address).getDurableQueues();
}
public int getQueueCount()
@@ -113,14 +113,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.RoutingContext#getAddresses()
*/
- public Pair<SimpleString, ContextListing>[] getAddresses()
+ public Map<SimpleString, RouteContextList> getContexListing()
{
- Object x = new Pair(a, b);
-
-
- Pair<SimpleString, ContextListing> [] contextListing = new Pair<SimpleString, ContextListing>[1];
- // TODO Auto-generated method stub
- return null;
+ return this.map;
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -17,12 +17,10 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,11 +40,9 @@
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PageImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -1014,9 +1010,9 @@
syncNonTransactional);
}
- protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
+ protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, org.hornetq.core.server.RouteContextList listCtx, boolean sync) throws Exception
{
- boolean paged = super.page(message, ctx, sync);
+ boolean paged = super.page(message, ctx, listCtx, sync);
if (paged)
{
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09 04:14:09 UTC (rev 9856)
@@ -515,7 +515,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
PagedReference readMessage = iterator.next();
@@ -552,7 +552,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@@ -588,7 +588,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@@ -1091,7 +1091,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
return pageStore.getNumberOfPages();
@@ -1202,7 +1202,7 @@
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
- pageStore.page(msg, ctx);
+ pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
}
}
15 years, 6 months
JBoss hornetq SVN: r9855 - in branches/Branch_New_Paging: src/main/org/hornetq/core/postoffice/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 22:04:54 -0500 (Mon, 08 Nov 2010)
New Revision: 9855
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
backup (it's not compiling)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -965,15 +965,17 @@
private long[] getQueueIDs(RoutingContext ctx)
{
- long ids[] = new long [ctx.getDurableQueues().size() + ctx.getNonDurableQueues().size()];
+ List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues(address);
+ List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues(address);
+ long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
- for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+ for (org.hornetq.core.server.Queue q : durableQueues)
{
ids[i++] = q.getID();
}
- for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+ for (org.hornetq.core.server.Queue q : nonDurableQueues)
{
ids[i++] = q.getID();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -839,83 +839,89 @@
final List<MessageReference> refs = new ArrayList<MessageReference>();
Transaction tx = context.getTransaction();
-
- for (Queue queue : context.getNonDurableQueues())
+
+ for (SimpleString add: context.getAddresses())
{
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+
+ PagingStore store = pagingManager.getPageStore(add);
+
+ for (Queue queue : context.getNonDurableQueues(add))
{
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+ {
+ Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ message.incrementRefCount();
}
-
- message.incrementRefCount();
- }
-
- Iterator<Queue> iter = context.getDurableQueues().iterator();
-
- while (iter.hasNext())
- {
- Queue queue = iter.next();
-
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+
+ Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+
+ while (iter.hasNext())
{
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (message.isDurable())
- {
- int durableRefCount = message.incrementDurableRefCount();
-
- if (durableRefCount == 1)
+ Queue queue = iter.next();
+
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
+ Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ if (message.isDurable())
+ {
+ int durableRefCount = message.incrementDurableRefCount();
+
+ if (durableRefCount == 1)
+ {
+ if (tx != null)
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ }
+ else
+ {
+ storageManager.storeMessage(message);
+ }
+ }
+
if (tx != null)
{
- storageManager.storeMessageTransactional(tx.getID(), message);
+ storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+
+ tx.setContainsPersistent();
}
else
{
- storageManager.storeMessage(message);
+ storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
}
- }
-
- if (tx != null)
- {
- storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
-
- tx.setContainsPersistent();
- }
- else
- {
- storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
- }
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
- {
- if (tx != null)
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
- storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+ if (tx != null)
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
}
- else
- {
- storageManager.updateScheduledDeliveryTime(reference);
- }
}
+
+ message.incrementRefCount();
}
-
- message.incrementRefCount();
}
-
+
if (tx != null)
{
tx.addOperation(new AddOperation(refs));
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+/**
+ * This is a simple datatype containing the list of a routing context
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface RouteContextList
+{
+
+ List<Queue> getDurableQueues();
+
+ List<Queue> getNonDurableQueues();
+
+}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -15,12 +15,15 @@
import java.util.List;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.transaction.Transaction;
/**
* A RoutingContext
*
* @author Tim Fox
+ * @author Clebert Suconic
*
*
*/
@@ -30,14 +33,18 @@
void setTransaction(Transaction transaction);
- void addQueue(Queue queue);
+ void addQueue(SimpleString address, Queue queue);
- List<Queue> getNonDurableQueues();
+ Pair<SimpleString, RouteContextList> getContexListing();
+
+ List<Queue> getNonDurableQueues(SimpleString address);
- List<Queue> getDurableQueues();
+ List<Queue> getDurableQueues(SimpleString address);
int getQueueCount();
void clear();
+
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -204,7 +204,7 @@
{
// There can be many remote bindings for the same node, we only want to add the message once to
// the s & f queue for that node
- context.addQueue(storeAndForwardQueue);
+ context.addQueue(address, storeAndForwardQueue);
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -267,7 +267,7 @@
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
- context.addQueue(this);
+ context.addQueue(address, this);
}
// Queue implementation ----------------------------------------------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -14,9 +14,15 @@
package org.hornetq.core.server.impl;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.transaction.Transaction;
@@ -29,10 +35,10 @@
*/
public class RoutingContextImpl implements RoutingContext
{
- private final List<Queue> nonDurableQueues = new ArrayList<Queue>(1);
+
+ // The pair here is Durable and NonDurable
+ private Map<SimpleString, ContextListing> map = new HashMap<SimpleString, ContextListing>();
- private final List<Queue> durableQueues = new ArrayList<Queue>(1);
-
private Transaction transaction;
private int queueCount;
@@ -41,35 +47,42 @@
{
this.transaction = transaction;
}
-
+
public void clear()
{
transaction = null;
- nonDurableQueues.clear();
-
- durableQueues.clear();
-
+ map.clear();
+
queueCount = 0;
}
- public void addQueue(final Queue queue)
+ public void addQueue(final SimpleString address, final Queue queue)
{
+
+ ContextListing listing = getContextListing(address);
+
if (queue.isDurable())
{
- durableQueues.add(queue);
+ listing.durableQueues.add(queue);
}
else
{
- nonDurableQueues.add(queue);
+ listing.durableQueues.add(queue);
}
queueCount++;
}
-
- public void addDurableQueue(final Queue queue)
+
+ private ContextListing getContextListing(SimpleString address)
{
- durableQueues.add(queue);
+ ContextListing listing = map.get(address);
+ if (listing == null)
+ {
+ listing = new ContextListing();
+ map.put(address, listing);
+ }
+ return listing;
}
public Transaction getTransaction()
@@ -82,14 +95,14 @@
transaction = tx;
}
- public List<Queue> getNonDurableQueues()
+ public List<Queue> getNonDurableQueues(SimpleString address)
{
- return nonDurableQueues;
+ return getContextListing(address).nonDurableQueues;
}
- public List<Queue> getDurableQueues()
+ public List<Queue> getDurableQueues(SimpleString address)
{
- return durableQueues;
+ return getContextListing(address).durableQueues;
}
public int getQueueCount()
@@ -97,4 +110,41 @@
return queueCount;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RoutingContext#getAddresses()
+ */
+ public Pair<SimpleString, ContextListing>[] getAddresses()
+ {
+ Object x = new Pair(a, b);
+
+
+ Pair<SimpleString, ContextListing> [] contextListing = new Pair<SimpleString, ContextListing>[1];
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ private class ContextListing implements RouteContextList
+ {
+ private List<Queue> durableQueue = new ArrayList<Queue>(1);
+
+ private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RouteContextList#getDurableQueues()
+ */
+ public List<Queue> getDurableQueues()
+ {
+ return durableQueue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RouteContextList#getNonDurableQueues()
+ */
+ public List<Queue> getNonDurableQueues()
+ {
+ return nonDurableQueue;
+ }
+ }
+
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09 03:04:54 UTC (rev 9855)
@@ -631,11 +631,11 @@
private RoutingContextImpl generateCTX(Transaction tx)
{
RoutingContextImpl ctx = new RoutingContextImpl(tx);
- ctx.addDurableQueue(queue);
+ ctx.addQueue(ADDRESS, queue);
for (Queue q : this.queueList)
{
- ctx.addQueue(q);
+ ctx.addQueue(ADDRESS, q);
}
return ctx;
15 years, 6 months
JBoss hornetq SVN: r9854 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 9 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 16:21:08 -0500 (Mon, 08 Nov 2010)
New Revision: 9854
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -30,6 +30,9 @@
{
ServerMessage getMessage();
+ /** The queues that were routed during paging */
+ long[] getQueueIDs();
+
void initMessage(StorageManager storageManager);
long getTransactionID();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -13,11 +13,10 @@
package org.hornetq.core.paging;
-import java.util.List;
-
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -59,10 +58,8 @@
void sync() throws Exception;
- boolean page(List<ServerMessage> messages, long transactionId) throws Exception;
+ boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
- boolean page(ServerMessage message) throws Exception;
-
Page createPage(final int page) throws Exception;
PagingManager getPagingManager();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -32,6 +32,7 @@
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -135,6 +136,12 @@
else if (retPos != null)
{
cursorPos = retPos.getPosition();
+
+ if (!routed(retPos.getPagedMessage(), cursor))
+ {
+ cursor.positionIgnored(cursorPos);
+ }
+ else
if (retPos.getPagedMessage().getTransactionID() != 0)
{
PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
@@ -160,6 +167,20 @@
}
}
}
+
+ private boolean routed(PagedMessage message, PageSubscription subs)
+ {
+ long id = subs.getId();
+
+ for (long qid : message.getQueueIDs())
+ {
+ if (qid == id)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
private PagedReferenceImpl internalGetNext(final PagePosition pos)
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -49,17 +49,20 @@
private byte[] largeMessageLazyData;
private ServerMessage message;
+
+ private long queueIDs[];
private long transactionID = 0;
- public PagedMessageImpl(final ServerMessage message, final long transactionID)
+ public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID)
{
- this.message = message;
+ this(message, queueIDs);
this.transactionID = transactionID;
}
- public PagedMessageImpl(final ServerMessage message)
+ public PagedMessageImpl(final ServerMessage message, final long[] queueIDs)
{
+ this.queueIDs = queueIDs;
this.message = message;
}
@@ -87,6 +90,11 @@
{
return transactionID;
}
+
+ public long[] getQueueIDs()
+ {
+ return queueIDs;
+ }
// EncodingSupport implementation --------------------------------
@@ -112,6 +120,15 @@
message.decode(buffer);
}
+
+ int queueIDsSize = buffer.readInt();
+
+ queueIDs = new long[queueIDsSize];
+
+ for (int i = 0 ; i < queueIDsSize; i++)
+ {
+ queueIDs[i] = buffer.readLong();
+ }
}
public void encode(final HornetQBuffer buffer)
@@ -123,11 +140,19 @@
buffer.writeInt(message.getEncodeSize());
message.encode(buffer);
+
+ buffer.writeInt(queueIDs.length);
+
+ for (int i = 0 ; i < queueIDs.length; i++)
+ {
+ buffer.writeLong(queueIDs[i]);
+ }
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize();
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
}
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.api.core.SimpleString;
@@ -46,6 +45,7 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -304,19 +304,13 @@
return storeName;
}
- public boolean page(final List<ServerMessage> message, final long transactionID) throws Exception
+ public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
{
// The sync on transactions is done on commit only
- return page(message, transactionID, false);
+ // TODO: sync on paging
+ return page(message, ctx, false);
}
- public boolean page(final ServerMessage message) throws Exception
- {
- // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
- // of crash
- return page(Arrays.asList(message), -1, syncNonTransactional && message.isDurable());
- }
-
public void sync() throws Exception
{
lock.readLock().lock();
@@ -881,7 +875,7 @@
}
- protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+ protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean sync) throws Exception
{
if (!running)
{
@@ -939,37 +933,27 @@
return false;
}
- for (ServerMessage message : messages)
+ PagedMessage pagedMessage;
+
+ if (!message.isDurable())
{
- PagedMessage pagedMessage;
+ // The address should never be transient when paging (even for non-persistent messages when paging)
+ // This will force everything to be persisted
+ message.bodyChanged();
+ }
- if (!message.isDurable())
- {
- // The address should never be transient when paging (even for non-persistent messages when paging)
- // This will force everything to be persisted
- message.bodyChanged();
- }
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx), getTransactionID(ctx));
- if (transactionID != -1)
- {
- pagedMessage = new PagedMessageImpl(message, transactionID);
- }
- else
- {
- pagedMessage = new PagedMessageImpl(message);
- }
+ int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
- int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+ {
+ // Make sure nothing is currently validating or using currentPage
+ openNewPage();
+ }
- if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
- {
- // Make sure nothing is currently validating or using currentPage
- openNewPage();
- }
+ currentPage.write(pagedMessage);
- currentPage.write(pagedMessage);
- }
-
return true;
}
finally
@@ -979,6 +963,36 @@
}
+ private long[] getQueueIDs(RoutingContext ctx)
+ {
+ long ids[] = new long [ctx.getDurableQueues().size() + ctx.getNonDurableQueues().size()];
+ int i = 0;
+
+ for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+ {
+ ids[i++] = q.getID();
+ }
+
+ for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+ {
+ ids[i++] = q.getID();
+ }
+ return ids;
+ }
+
+ private long getTransactionID(RoutingContext ctx)
+ {
+ Transaction tx = ctx.getTransaction();
+ if (tx == null)
+ {
+ return 0l;
+ }
+ else
+ {
+ return tx.getID();
+ }
+ }
+
/**
* This method will remove files from the page system and and route them, doing it transactionally
*
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -227,16 +227,6 @@
{
return pageStore;
}
-
- public void paged(final ServerMessage message)
- {
-
- }
-
- public boolean page(final ServerMessage message) throws Exception
- {
- return pageStore.page(message);
- }
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -56,10 +56,6 @@
PagingStore getPagingStore();
- boolean page() throws Exception;
-
- boolean page(long transactionID) throws Exception;
-
boolean storeIsPaging();
void encodeMessageIDToBuffer();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -249,30 +249,6 @@
return pagingStore;
}
- public boolean page() throws Exception
- {
- if (pagingStore != null)
- {
- return pagingStore.page(this);
- }
- else
- {
- return false;
- }
- }
-
- public boolean page(final long transactionID) throws Exception
- {
- if (pagingStore != null)
- {
- return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
- }
- else
- {
- return false;
- }
- }
-
public boolean storeIsPaging()
{
if (pagingStore != null)
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -1014,9 +1014,9 @@
syncNonTransactional);
}
- protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+ protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
{
- boolean paged = super.page(messages, transactionID, sync);
+ boolean paged = super.page(message, ctx, sync);
if (paged)
{
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -42,11 +42,14 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.LinkedListIterator;
@@ -70,6 +73,8 @@
private HornetQServer server;
private Queue queue;
+
+ private List<Queue> queueList;
private static final int PAGE_MAX = -1;
@@ -155,10 +160,6 @@
final int NUM_MESSAGES = 100;
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
PageSubscription cursorEven = createNonPersistentCursor(new Filter()
{
@@ -205,6 +206,10 @@
});
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
queue.getPageSubscription().close();
PagedReference msg;
@@ -493,6 +498,8 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
+
+ RoutingContextImpl ctx = generateCTX();
LinkedListIterator<PagedReference> iterator = cursor.iterator();
@@ -508,7 +515,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
PagedReference readMessage = iterator.next();
@@ -545,7 +552,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
PagedReference readMessage = iterator.next();
@@ -581,7 +588,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
PagedReference readMessage = iterator.next();
@@ -615,6 +622,24 @@
assertFalse(lookupPageStore(ADDRESS).isPaging());
}
+
+ private RoutingContextImpl generateCTX()
+ {
+ return generateCTX(null);
+ }
+
+ private RoutingContextImpl generateCTX(Transaction tx)
+ {
+ RoutingContextImpl ctx = new RoutingContextImpl(tx);
+ ctx.addDurableQueue(queue);
+
+ for (Queue q : this.queueList)
+ {
+ ctx.addQueue(q);
+ }
+
+ return ctx;
+ }
/**
* @throws Exception
@@ -783,15 +808,19 @@
final int NUM_MESSAGES = 100;
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
PageCursorProvider cursorProvider = lookupCursorProvider();
PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
+
+ this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
+
+ this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
queue.getPageSubscription().close();
PagedReference msg;
@@ -856,16 +885,18 @@
final int NUM_MESSAGES = 100;
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+
+ queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
+
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-
queue.getPageSubscription().close();
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
@@ -1044,6 +1075,8 @@
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
pageStore.startPaging();
+
+ RoutingContext ctx = generateCTX();
for (int i = start; i < start + numMessages; i++)
{
@@ -1058,7 +1091,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
return pageStore.getNumberOfPages();
@@ -1077,12 +1110,23 @@
// Protected -----------------------------------------------------
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ queue = null;
+ queueList = null;
+ super.tearDown();
+ }
+
protected void setUp() throws Exception
{
super.setUp();
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
+ queueList = new ArrayList<Queue>();
+
createServer();
}
@@ -1117,7 +1161,9 @@
*/
private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
{
- return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), filter, false);
+ long id = server.getStorageManager().generateUniqueID();
+ queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
+ return lookupCursorProvider().createSubscription(id, filter, false);
}
/**
@@ -1145,7 +1191,10 @@
final int NUM_MESSAGES,
final int messageSize) throws Exception
{
- List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ TransactionImpl txImpl = new TransactionImpl(pgParameter.getTransactionID(), null, null);
+
+ RoutingContext ctx = generateCTX(txImpl);
for (int i = start; i < start + NUM_MESSAGES; i++)
{
@@ -1153,20 +1202,11 @@
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
- messages.add(msg);
+ pageStore.page(msg, ctx);
}
- pageStore.page(messages, pgParameter.getTransactionID());
}
- protected void tearDown() throws Exception
- {
- server.stop();
- server = null;
- queue = null;
- super.tearDown();
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -288,7 +288,7 @@
replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
- PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+ PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
manager.pageWrite(pgmsg, 1);
manager.pageWrite(pgmsg, 2);
manager.pageWrite(pgmsg, 3);
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -223,7 +223,7 @@
msg.setAddress(simpleDestination);
- page.write(new PagedMessageImpl(msg));
+ page.write(new PagedMessageImpl(msg, new long [0]));
Assert.assertEquals(initialNumberOfMessages + i + 1, page.getNumberOfMessages());
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -28,6 +28,7 @@
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -81,11 +82,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- Assert.assertFalse(store.page(msg));
+ Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
store.startPaging();
- Assert.assertTrue(store.page(msg));
+ Assert.assertTrue(store.page(msg, new RoutingContextImpl(null)));
Page page = store.depage();
@@ -107,7 +108,7 @@
Assert.assertNull(store.depage());
- Assert.assertFalse(store.page(msg));
+ Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
}
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -64,6 +64,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -202,7 +203,7 @@
Assert.assertTrue(storeImpl.isPaging());
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -265,7 +266,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
}
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -345,7 +346,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
}
Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -381,7 +382,7 @@
ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Page newPage = storeImpl.depage();
@@ -399,11 +400,11 @@
Assert.assertFalse(storeImpl.isPaging());
- Assert.assertFalse(storeImpl.page(msg));
+ Assert.assertFalse(storeImpl.page(msg, new RoutingContextImpl(null)));
storeImpl.startPaging();
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Page page = storeImpl.depage();
@@ -499,7 +500,7 @@
// This is possible because the depage thread is not actually reading the pages.
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
- if (storeImpl.page(msg))
+ if (storeImpl.page(msg, new RoutingContextImpl(null)))
{
buffers.put(id, msg);
}
@@ -644,7 +645,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg);
+ storeImpl2.page(lastMsg, new RoutingContextImpl(null));
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -752,7 +753,7 @@
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(i, storeImpl, destination, createRandomBuffer(i, 1024));
msg.putLongProperty("count", i);
- while (!storeImpl.page(msg))
+ while (!storeImpl.page(msg, new RoutingContextImpl(null)))
{
storeImpl.startPaging();
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-08 21:21:08 UTC (rev 9854)
@@ -92,10 +92,18 @@
}
private final SimpleString name;
+
+ private final long id;
public FakeQueue(final SimpleString name)
{
+ this(name, 0);
+ }
+
+ public FakeQueue(final SimpleString name, final long id)
+ {
this.name = name;
+ this.id = id;
}
/* (non-Javadoc)
@@ -354,8 +362,7 @@
*/
public long getID()
{
- // TODO Auto-generated method stub
- return 0;
+ return id;
}
/* (non-Javadoc)
15 years, 6 months