Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 14:25:04 -0500 (Wed, 10 Nov 2010)
New Revision: 9866
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
Log:
async deliveries after paging
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-10
17:28:31 UTC (rev 9865)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-10
19:25:04 UTC (rev 9866)
@@ -948,8 +948,10 @@
// This will force everything to be persisted
message.bodyChanged();
}
+
+ Transaction tx = ctx.getTransaction();
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx),
getTransactionID(ctx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx),
getTransactionID(tx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -989,9 +991,8 @@
return ids;
}
- private long getTransactionID(RoutingContext ctx) throws Exception
+ private long getTransactionID(Transaction tx) throws Exception
{
- Transaction tx = ctx.getTransaction();
if (tx == null)
{
return 0l;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-10
17:28:31 UTC (rev 9865)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-10
19:25:04 UTC (rev 9866)
@@ -15,9 +15,11 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -59,6 +61,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -834,6 +837,28 @@
processRoute(message, context, false);
}
}
+
+
+ private class PageDelivery extends TransactionOperationAbstract
+ {
+ private Set<Queue> queues = new HashSet<Queue>();
+
+ public void addQueues(List<Queue> queueList)
+ {
+ queues.addAll(queueList);
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ // We need to try delivering async after paging, or nothing may start a delivery
after paging since nothing is going towards the queues
+ // The queue will try to depage case it's empty
+ for (Queue queue : queues)
+ {
+ queue.deliverAsync();
+ }
+ }
+
+ }
private void processRoute(final ServerMessage message, final RoutingContext context,
final boolean direct) throws Exception
{
@@ -848,6 +873,50 @@
if (store.page(message, context, entry.getValue()))
{
+
+ if (tx != null)
+ {
+ PageDelivery delivery =
(PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+ if (delivery == null)
+ {
+ delivery = new PageDelivery();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+ tx.addOperation(delivery);
+ }
+
+ delivery.addQueues(entry.getValue().getDurableQueues());
+ delivery.addQueues(entry.getValue().getNonDurableQueues());
+ }
+ else
+ {
+
+ List<Queue> durableQueues = entry.getValue().getDurableQueues();
+ List<Queue> nonDurableQueues =
entry.getValue().getNonDurableQueues();
+
+ final List<Queue> queues = new
ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
+
+ queues.addAll(durableQueues);
+ queues.addAll(nonDurableQueues);
+
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Queue queue : queues)
+ {
+ // in case of paging, we need to kick asynchronous delivery to
try delivering
+ queue.deliverAsync();
+ }
+ }
+ });
+ }
+
+
continue;
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java 2010-11-10
17:28:31 UTC (rev 9865)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java 2010-11-10
19:25:04 UTC (rev 9866)
@@ -50,16 +50,16 @@
/** After commit shouldn't throw any exception. Any verification has to be done on
before commit */
public void afterCommit(Transaction tx)
{
- };
+ }
public void beforeRollback(Transaction tx) throws Exception
{
- };
+ }
/** After rollback shouldn't throw any exception. Any verification has to be done
on before rollback */
public void afterRollback(Transaction tx)
{
- };
+ }
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-10
17:28:31 UTC (rev 9865)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-10
19:25:04 UTC (rev 9866)
@@ -30,7 +30,7 @@
public static final int REFS_OPERATION = 6;
- public static final int PAGE_MESSAGES_OPERATION = 7;
+ public static final int PAGE_DELIVERY = 7;
public static final int PAGE_CURSOR_POSITIONS = 8;
}