Author: clebert.suconic(a)jboss.com
Date: 2010-11-11 12:28:05 -0500 (Thu, 11 Nov 2010)
New Revision: 9876
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
Scheduling on paging
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-11
14:44:07 UTC (rev 9875)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-11
17:28:05 UTC (rev 9876)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.LinkedListIterator;
@@ -111,4 +112,11 @@
void setQueue(Queue queue);
Queue getQueue();
+
+ /**
+ * To be used to requery the reference case the Garbage Collection removed it from the
PagedReference as it's using WeakReferences
+ * @param pos
+ * @return
+ */
+ PagedMessage queryMessage(PagePosition pos);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-11
14:44:07 UTC (rev 9875)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-11
17:28:05 UTC (rev 9876)
@@ -13,6 +13,9 @@
package org.hornetq.core.paging.cursor;
+import java.lang.ref.WeakReference;
+
+import org.hornetq.api.core.Message;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -33,18 +36,32 @@
private final PagePosition position;
- private final PagedMessage message;
+ private WeakReference<PagedMessage> message;
+ private Long deliveryTime = null;
+
private final PageSubscription subscription;
public ServerMessage getMessage()
{
- return message.getMessage();
+ return getPagedMessage().getMessage();
}
- public PagedMessage getPagedMessage()
+ public synchronized PagedMessage getPagedMessage()
{
- return message;
+ PagedMessage returnMessage = message.get();
+
+ // We only keep a few references on the Queue from paging...
+ // Besides those references are SoftReferenced on page cache...
+ // So, this will unlikely be null,
+ // unless the Queue has stalled for some time after paging
+ if (returnMessage == null)
+ {
+ // reference is gone, we will reconstruct it
+ returnMessage = subscription.queryMessage(position);
+ message = new WeakReference<PagedMessage>(returnMessage);
+ }
+ return returnMessage;
}
public PagePosition getPosition()
@@ -55,7 +72,7 @@
public PagedReferenceImpl(final PagePosition position, final PagedMessage message,
final PageSubscription subscription)
{
this.position = position;
- this.message = message;
+ this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -78,8 +95,19 @@
*/
public long getScheduledDeliveryTime()
{
- // TODO Auto-generated method stub
- return 0;
+ if (deliveryTime == null)
+ {
+ ServerMessage msg = getMessage();
+ if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+ {
+ deliveryTime =
getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ }
+ else
+ {
+ deliveryTime = 0l;
+ }
+ }
+ return deliveryTime;
}
/* (non-Javadoc)
@@ -87,8 +115,7 @@
*/
public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
{
- // TODO Auto-generated method stub
-
+ deliveryTime = scheduledDeliveryTime;
}
/* (non-Javadoc)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11
14:44:07 UTC (rev 9875)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11
17:28:05 UTC (rev 9876)
@@ -522,9 +522,10 @@
// the page stays locked until the entire reading is finished
if (needToRead)
{
+ Page page = null;
try
{
- Page page = pagingStore.createPage((int)pageId);
+ page = pagingStore.createPage((int)pageId);
page.open();
@@ -540,6 +541,16 @@
}
finally
{
+ try
+ {
+ if (page != null)
+ {
+ page.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
cache.unlock();
}
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11
14:44:07 UTC (rev 9875)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11
17:28:05 UTC (rev 9876)
@@ -32,6 +32,7 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -464,6 +465,23 @@
}
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public PagedMessage queryMessage(PagePosition pos)
+ {
+ try
+ {
+ return cursorProvider.getMessage(pos);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+
/**
* Theres no need to synchronize this method as it's only called from journal load
on startup
*/
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-11
14:44:07 UTC (rev 9875)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-11
17:28:05 UTC (rev 9876)
@@ -874,49 +874,8 @@
if (store.page(message, context, entry.getValue()))
{
- if (tx != null)
- {
- PageDelivery delivery =
(PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
- if (delivery == null)
- {
- delivery = new PageDelivery();
- tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
- tx.addOperation(delivery);
- }
-
- delivery.addQueues(entry.getValue().getDurableQueues());
- delivery.addQueues(entry.getValue().getNonDurableQueues());
- }
- else
- {
-
- List<Queue> durableQueues = entry.getValue().getDurableQueues();
- List<Queue> nonDurableQueues =
entry.getValue().getNonDurableQueues();
-
- final List<Queue> queues = new
ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
-
- queues.addAll(durableQueues);
- queues.addAll(nonDurableQueues);
-
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- for (Queue queue : queues)
- {
- // in case of paging, we need to kick asynchronous delivery to
try delivering
- queue.deliverAsync();
- }
- }
- });
- }
-
-
+ // We need to kick delivery so the Queues may check for the cursors case they
are empty
+ schedulePageDelivery(tx, entry);
continue;
}
@@ -1023,6 +982,56 @@
}
/**
+ * This will kick a delivery async on the queue, so the queue may have a chance to
depage messages
+ * @param tx
+ * @param entry
+ */
+ private void schedulePageDelivery(Transaction tx, Map.Entry<SimpleString,
RouteContextList> entry)
+ {
+ if (tx != null)
+ {
+ PageDelivery delivery =
(PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+ if (delivery == null)
+ {
+ delivery = new PageDelivery();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+ tx.addOperation(delivery);
+ }
+
+ delivery.addQueues(entry.getValue().getDurableQueues());
+ delivery.addQueues(entry.getValue().getNonDurableQueues());
+ }
+ else
+ {
+
+ List<Queue> durableQueues = entry.getValue().getDurableQueues();
+ List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+
+ final List<Queue> queues = new ArrayList<Queue>(durableQueues.size()
+ nonDurableQueues.size());
+
+ queues.addAll(durableQueues);
+ queues.addAll(nonDurableQueues);
+
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Queue queue : queues)
+ {
+ // in case of paging, we need to kick asynchronous delivery to try
delivering
+ queue.deliverAsync();
+ }
+ }
+ });
+ }
+ }
+
+ /**
* @param refs
*/
private void addReferences(final List<MessageReference> refs, final boolean
direct)