Author: clebert.suconic(a)jboss.com
Date: 2010-11-09 18:58:11 -0500 (Tue, 09 Nov 2010)
New Revision: 9862
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
changes
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09
23:38:48 UTC (rev 9861)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09
23:58:11 UTC (rev 9862)
@@ -51,6 +51,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -988,7 +989,7 @@
return ids;
}
- private long getTransactionID(RoutingContext ctx)
+ private long getTransactionID(RoutingContext ctx) throws Exception
{
Transaction tx = ctx.getTransaction();
if (tx == null)
@@ -997,10 +998,68 @@
}
else
{
+ if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+ {
+ PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ System.out.println("Creating pageTransaction " +
pgTX.getTransactionID());
+ storageManager.storePageTransaction(tx.getID(), pgTX);
+ pagingManager.addTransaction(pgTX);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
+ tx.addOperation(new FinishPageMessageOperation());
+
+ tx.setContainsPersistent();
+ }
+
return tx.getID();
}
}
+
+ private static class FinishPageMessageOperation implements TransactionOperation
+ {
+
+ public void afterCommit(final Transaction tx)
+ {
+ // If part of the transaction goes to the queue, and part goes to paging, we
can't let depage start for the
+ // transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+
+ PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction != null)
+ {
+ pageTransaction.commit();
+ }
+ }
+
+ public void afterPrepare(final Transaction tx)
+ {
+ }
+
+ public void afterRollback(final Transaction tx)
+ {
+ PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (tx.getState() == State.PREPARED && pageTransaction != null)
+ {
+ pageTransaction.rollback();
+ }
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ }
+
/**
* This method will remove files from the page system and and route them, doing it
transactionally
*
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09
23:38:48 UTC (rev 9861)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09
23:58:11 UTC (rev 9862)
@@ -1267,13 +1267,20 @@
{
return;
}
-
- int nmessages = 0;
- while (nmessages < MAX_DELIVERIES_IN_LOOP && pageIterator.hasNext())
+
+ int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() +
getScheduledCount() + concurrentQueue.size());
+
+ if (msgsToDeliver > 0)
{
- nmessages ++;
- addTail(pageIterator.next(), false);
- pageIterator.remove();
+ System.out.println("Depaging " + msgsToDeliver + "
messages");
+
+ int nmessages = 0;
+ while (nmessages < msgsToDeliver && pageIterator.hasNext())
+ {
+ nmessages ++;
+ addTail(pageIterator.next(), false);
+ pageIterator.remove();
+ }
}
deliverAsync();