[hornetq-commits] JBoss hornetq SVN: r10649 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 11 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri May 13 00:10:27 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-13 00:10:25 -0400 (Fri, 13 May 2011)
New Revision: 10649
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
JBPAPP-6466 - improving IO on depaging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -94,6 +94,8 @@
void renameTo(String newFileName) throws Exception;
SequentialFile copy();
+
+ void copyTo(SequentialFile newFileName);
void setTimedBuffer(TimedBuffer buffer);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -104,6 +104,37 @@
file.delete();
}
+
+ public void copyTo(SequentialFile newFileName)
+ {
+ try
+ {
+ log.debug("Copying " + this + " as " + newFileName);
+ newFileName.open();
+ this.open();
+
+
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+
+ for (;;)
+ {
+ buffer.rewind();
+ int size = this.read(buffer);
+ newFileName.writeInternal(buffer);
+ if (size < 10 * 1024)
+ {
+ break;
+ }
+ }
+ newFileName.close();
+ this.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on copying file " + this + " as " + newFileName, e);
+ }
+
+ }
public void position(final long pos) throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -36,6 +36,7 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
@@ -79,6 +80,8 @@
{
Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+
+ Set<Long> pgTXs = cursorACKs.b;
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -146,7 +149,7 @@
System.out.print(",");
}
}
- if (msg.getTransactionID() != 0 && ! cursorACKs.b.contains(msg.getTransactionID()));
+ if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
{
System.out.print(", **PG_TX_NOT_FOUND**");
}
@@ -230,7 +233,12 @@
}
else
{
- pgTXs.add(record.id);
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+ pgTXs.add(pageTransactionInfo.getTransactionID());
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging.cursor;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.Queue;
@@ -131,4 +133,9 @@
* @return
*/
PagedMessage queryMessage(PagePosition pos);
+
+ /**
+ * @return
+ */
+ Executor getExecutor();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -14,8 +14,10 @@
package org.hornetq.core.paging.cursor;
import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Message;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -34,6 +36,10 @@
private static final long serialVersionUID = -8640232251318264710L;
+ private static final Logger log = Logger.getLogger(PagedReferenceImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
+
private final PagePosition position;
private WeakReference<PagedMessage> message;
@@ -42,6 +48,8 @@
private int persistedCount;
+ private AtomicInteger deliveryCount = new AtomicInteger(0);
+
private final PageSubscription subscription;
public ServerMessage getMessage()
@@ -84,12 +92,12 @@
{
return true;
}
-
+
public void setPersistedCount(int count)
{
this.persistedCount = count;
}
-
+
public int getPersistedCount()
{
return persistedCount;
@@ -100,8 +108,7 @@
*/
public MessageReference copy(final Queue queue)
{
- // TODO Auto-generated method stub
- return null;
+ return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
}
/* (non-Javadoc)
@@ -137,8 +144,7 @@
*/
public int getDeliveryCount()
{
- // TODO Auto-generated method stub
- return 0;
+ return deliveryCount.get();
}
/* (non-Javadoc)
@@ -146,8 +152,7 @@
*/
public void setDeliveryCount(final int deliveryCount)
{
- // TODO Auto-generated method stub
-
+ this.deliveryCount.set(deliveryCount);
}
/* (non-Javadoc)
@@ -155,7 +160,11 @@
*/
public void incrementDeliveryCount()
{
- // TODO Auto-generated method stub
+ deliveryCount.incrementAndGet();
+ if (isTrace)
+ {
+ log.trace("deliveryCount = " + deliveryCount + " for " + this);
+ }
}
@@ -164,8 +173,7 @@
*/
public void decrementDeliveryCount()
{
- // TODO Auto-generated method stub
-
+ deliveryCount.decrementAndGet();
}
/* (non-Javadoc)
@@ -199,4 +207,25 @@
{
subscription.ackTx(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "PagedReferenceImpl [position=" + position +
+ ", message=" +
+ message +
+ ", deliveryTime=" +
+ deliveryTime +
+ ", persistedCount=" +
+ persistedCount +
+ ", deliveryCount=" +
+ deliveryCount +
+ ", subscription=" +
+ subscription +
+ "]";
+ }
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -30,7 +30,6 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -57,8 +56,7 @@
private final StorageManager storageManager;
- private final ExecutorFactory executorFactory;
-
+ // This is the same executor used at the PageStoreImpl. One Executor per pageStore
private final Executor executor;
private final SoftValueHashMap<Long, PageCache> softCache;
@@ -71,13 +69,12 @@
public PageCursorProviderImpl(final PagingStore pagingStore,
final StorageManager storageManager,
- final ExecutorFactory executorFactory,
+ final Executor executor,
final int maxCacheSize)
{
this.pagingStore = pagingStore;
this.storageManager = storageManager;
- this.executorFactory = executorFactory;
- this.executor = executorFactory.getExecutor();
+ this.executor = executor;
this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
}
@@ -96,13 +93,7 @@
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
- activeCursor = new PageSubscriptionImpl(this,
- pagingStore,
- storageManager,
- executorFactory.getExecutor(),
- filter,
- cursorID,
- persistent);
+ activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, executor, filter, cursorID, persistent);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -389,6 +380,17 @@
{
pagingStore.stopPaging();
}
+ else
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() +
+ " as numberOfPages == " +
+ pagingStore.getNumberOfPages() +
+ " and currentPage.numberOfMessages = " +
+ pagingStore.getCurrentPage().getNumberOfMessages());
+ }
+ }
}
catch (Exception ex)
{
@@ -411,7 +413,7 @@
{
cache = softCache.remove((long)depagedPage.getPageId());
}
-
+
if (cache == null)
{
// The page is not on cache any more
@@ -426,7 +428,7 @@
{
pgdMessages = cache.getMessages();
}
-
+
depagedPage.delete(pgdMessages);
synchronized (softCache)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.MessageReference;
@@ -40,6 +41,8 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+
+ static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -51,8 +54,12 @@
private long recordID = -1;
private boolean persistent;
+
+ private final PageSubscription subscription;
private final StorageManager storage;
+
+ private final Executor executor;
private final AtomicLong value = new AtomicLong(0);
@@ -60,8 +67,6 @@
private LinkedList<Pair<Long, Integer>> loadList;
- private final Executor executor;
-
private final Runnable cleanupCheck = new Runnable()
{
public void run()
@@ -77,14 +82,16 @@
// Constructors --------------------------------------------------
public PageSubscriptionCounterImpl(final StorageManager storage,
+ final PageSubscription subscription,
+ final Executor executor,
final boolean persistent,
- final long subscriptionID,
- final Executor executor)
+ final long subscriptionID)
{
this.subscriptionID = subscriptionID;
+ this.executor = executor;
this.storage = storage;
- this.executor = executor;
this.persistent = persistent;
+ this.subscription = subscription;
}
/* (non-Javadoc)
@@ -253,10 +260,13 @@
}
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
+ if (isTrace)
+ {
+ log.trace("Replacing page-counter record = " + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
+ }
storage.commit(txCleanup);
-
- storage.waitOnOperations();
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -92,8 +92,6 @@
private final PageCursorProvider cursorProvider;
- private final Executor executor;
-
private volatile PagePosition lastAckedPosition;
private List<PagePosition> recoveredACK;
@@ -101,6 +99,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final Executor executor;
private final AtomicLong deliveredCount = new AtomicLong(0);
@@ -126,7 +126,7 @@
this.executor = executor;
this.filter = filter;
this.persistent = persistent;
- this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
+ this.counter = new PageSubscriptionCounterImpl(store, this, executor, persistent, cursorId);
}
// Public --------------------------------------------------------
@@ -224,7 +224,7 @@
// First get the completed pages using a lock
synchronized (this)
{
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -687,6 +687,14 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscription#executeWithContext(java.lang.Runnable)
+ */
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
return getPageInfo(pos, true);
@@ -734,8 +742,17 @@
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
+ if (isTrace)
+ {
+ log.trace("a new position is being processed as ACK");
+ }
if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
{
+ if (isTrace)
+ {
+ log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
+ }
+
// there's a different page being acked, we will do the check right away
if (autoCleanup)
{
@@ -780,7 +797,7 @@
private PageTransactionInfo getPageTransaction(final PagedReference reference)
{
- if (reference.getPagedMessage().getTransactionID() != 0)
+ if (reference.getPagedMessage().getTransactionID() >= 0)
{
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
}
@@ -816,6 +833,7 @@
private final long pageId;
+ // TODO: Merge removedReferences and acks into a single structure
// Confirmed ACKs on this page
private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet<PagePosition>());
@@ -1135,6 +1153,13 @@
{
ignored = true;
}
+
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+
+ if (info != null && info.isRemoved(message.getPosition()))
+ {
+ continue;
+ }
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() >= 0)
@@ -1145,7 +1170,7 @@
{
log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
", ignoring message on position " +
- message.getPosition());
+ message.getPosition() + " on address=" + pageStore.getAddress() + " queue=" + queue.getName());
valid = false;
ignored = true;
}
@@ -1166,7 +1191,7 @@
// Say you have a Browser that will only read the files... there's no need to control PageCursors is
// nothing
// is being changed. That's why the false is passed as a parameter here
- PageCursorInfo info = getPageInfo(message.getPosition(), false);
+
if (info != null && info.isRemoved(message.getPosition()))
{
valid = false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -84,6 +84,7 @@
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
lgMessage.incrementDelayDeletionCount();
+ lgMessage.setPaged();
largeMessageLazyData = null;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -46,6 +46,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
@@ -200,7 +201,7 @@
this.syncTimer = null;
}
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory, addressSettings.getPageCacheMaxSize());
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
}
@@ -870,6 +871,11 @@
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+
+ if (message.isLargeMessage())
+ {
+ ((LargeServerMessage)message).setPaged();
+ }
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
@@ -48,6 +49,8 @@
private final JournalStorageManager storageManager;
private LargeServerMessage linkMessage;
+
+ private boolean paged;
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -82,6 +85,11 @@
// Public --------------------------------------------------------
+ public void setPaged()
+ {
+ paged = true;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
*/
@@ -260,27 +268,59 @@
}
}
}
+
+ public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
+ {
+ super.setOriginalHeaders(other, expiry);
+
+ LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
+ this.paged = otherLM.paged;
+ if (this.paged)
+ {
+ this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+ }
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
+ if (!paged)
{
- idToUse = linkMessage.getMessageID();
+ incrementDelayDeletionCount();
+
+ long idToUse = messageID;
+
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ newfile,
+ newID);
+ return newMessage;
}
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
- newfile,
- newID);
-
- return newMessage;
+ else
+ {
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+
+ newMessage.linkMessage = null;
+
+ newMessage.setPaged();
+
+ return newMessage;
+ }
}
public SequentialFile getFile()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -157,7 +157,14 @@
return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setPaged()
+ */
+ public void setPaged()
+ {
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -30,7 +30,13 @@
void setLinkedMessage(LargeServerMessage message);
boolean isFileExists() throws Exception;
-
+
+ /**
+ * We have to copy the large message content in case of DLQ and paged messages
+ * For that we need to pre-mark the LargeMessage with a flag when it is paged
+ */
+ void setPaged();
+
/** Close the files if opened */
void releaseResources();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -72,6 +72,8 @@
public class QueueImpl implements Queue
{
private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -396,7 +398,7 @@
public void deliverAsync()
{
- executor.execute(deliverRunner);
+ getExecutor().execute(deliverRunner);
}
public void close() throws Exception
@@ -411,7 +413,15 @@
public Executor getExecutor()
{
- return executor;
+ if (pageSubscription.isPaging())
+ {
+ // When in page mode, we don't want to have concurrent IO on the same PageStore
+ return pageSubscription.getExecutor();
+ }
+ else
+ {
+ return executor;
+ }
}
/* Only used on tests */
@@ -432,7 +442,7 @@
if (!ok)
{
- log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
+ log.warn("Couldn't finish waiting executors. Try increasing the thread pool size", new Exception ("trace"));
}
return ok;
@@ -1440,6 +1450,8 @@
int numRefs = messageReferences.size();
int handled = 0;
+
+ long timeout = System.currentTimeMillis() + 1000;
while (handled < numRefs)
{
@@ -1451,6 +1463,19 @@
return;
}
+
+ if (pageSubscription != null && pageSubscription.isPaging() && System.currentTimeMillis() > timeout)
+ {
+ if (isTrace)
+ {
+ log.trace("Page delivery has been running for too long. Scheduling another delivery task now");
+ }
+
+ deliverAsync();
+
+ return;
+ }
+
ConsumerHolder holder = consumerList.get(pos);
@@ -1549,7 +1574,7 @@
}
}
- if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
+ if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext())
{
scheduleDepage();
}
@@ -1580,7 +1605,7 @@
private void scheduleDepage()
{
- executor.execute(depageRunner);
+ pageSubscription.getExecutor().execute(depageRunner);
}
private void depage()
@@ -1629,11 +1654,13 @@
if (internalQueue)
{
+ if (isTrace)
+ {
+ log.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
+ }
// no DLQ check on internal queues
return true;
}
-
- // TODO: DeliveryCount on paging
if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
{
@@ -1644,9 +1671,18 @@
int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
+ if (isTrace)
+ {
+ log.trace("Checking redelivery for reference = " + reference + " with maxDeliveries = " + maxDeliveries + " on queue " + address);
+ }
+
// First check DLA
if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
{
+ if (isTrace)
+ {
+ log.trace("Sending reference " + reference + " to DLA");
+ }
sendToDeadLetterAddress(reference);
return false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -16,7 +16,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,8 +83,6 @@
private final ServerSession session;
- private final Executor executor;
-
private final Object lock = new Object();
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
@@ -153,8 +150,6 @@
messageQueue = binding.getQueue();
- this.executor = messageQueue.getExecutor();
-
this.started = browseOnly || started;
this.browseOnly = browseOnly;
@@ -376,7 +371,7 @@
Future future = new Future();
- executor.execute(future);
+ messageQueue.getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -483,7 +478,7 @@
Future future = new Future();
- executor.execute(future);
+ messageQueue.getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -668,7 +663,7 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ messageQueue.getExecutor().execute(browserDeliverer);
}
else
{
@@ -680,7 +675,7 @@
private void resumeLargeMessage()
{
- executor.execute(resumeLargeMessageRunnable);
+ messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
}
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
@@ -723,7 +718,7 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ messageQueue.getExecutor().execute(browserDeliverer);
}
else
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -13,10 +13,13 @@
package org.hornetq.tests.integration.client;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -42,9 +45,16 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -262,8 +272,7 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
ClientMessage msg = consumer.receive(5000);
@@ -284,7 +293,7 @@
}
assertNull(msg);
- for (int i = xids.size() -1 ; i >= 0; i--)
+ for (int i = xids.size() - 1; i >= 0; i--)
{
Xid xid = xids.get(i);
session.rollback(xid);
@@ -298,25 +307,25 @@
session = sf.createSession(false, false, false);
session.start();
-
+
consumer = session.createConsumer(PagingTest.ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
-
+
assertEquals(i, msg.getIntProperty("id").intValue());
-
+
if (i % 500 == 0)
{
session.commit();
}
}
-
+
session.commit();
-
+
session.close();
sf.close();
@@ -324,14 +333,13 @@
locator.close();
assertEquals(0, queue.getMessageCount());
-
+
long timeout = System.currentTimeMillis() + 5000;
while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
{
Thread.sleep(100);
}
- assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
- // assertEquals(numberOfMessages, queue.getMessageCount());
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
}
finally
{
@@ -346,6 +354,350 @@
}
+ public void testMissingTXEverythingAcked() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 5000;
+
+ final int numberOfTX = 10;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS.toString(), "q1", true);
+
+ session.createQueue(ADDRESS.toString(), "q2", true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % messagesPerTX == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ List<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+ JournalImpl jrn = new JournalImpl(config.getJournalFileSize(),
+ 2,
+ 0,
+ 0,
+ new NIOSequentialFileFactory(getJournalDir()),
+ "hornetq-data",
+ "hq",
+ 1);
+ jrn.start();
+ jrn.load(records, list, null);
+
+ // Delete everything from the journal
+ for (RecordInfo info : records)
+ {
+ if (!info.isUpdate)
+ {
+ jrn.appendDeleteRecord(info.id, false);
+ }
+ }
+
+ jrn.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ Page pg = server.getPagingManager().getPageStore(ADDRESS).getCurrentPage();
+
+ pg.open();
+
+ List<PagedMessage> msgs = pg.read(server.getStorageManager());
+
+ pg.close();
+
+ long queues[] = new long[] { server.locateQueue(new SimpleString("q1")).getID() };
+
+ for (long q : queues)
+ {
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ server.getStorageManager().storeCursorAcknowledge(q, new PagePositionImpl(pg.getPageId(), i));
+ }
+ }
+
+ server.stop();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ClientSessionFactory csf = locator.createSessionFactory();
+
+ ClientSession sess = csf.createSession();
+
+ sess.start();
+
+ ClientConsumer cons = sess.createConsumer("q1");
+
+ assertNull(cons.receive(500));
+
+ Thread.sleep(5000);
+
+ ClientConsumer cons2 = sess.createConsumer("q2");
+ assertNull(cons2.receive(500));
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sess.close();
+
+ locator.close();
+
+ server.stop();
+ }
+
+ public void testMissingTXEverythingAcked2() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 6;
+
+ final int numberOfTX = 2;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS.toString(), "q1", true);
+
+ session.createQueue(ADDRESS.toString(), "q2", true);
+
+ server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putStringProperty("id", "str-" + i);
+
+ producer.send(message);
+ if ((i + 1) % messagesPerTX == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ session.start();
+
+ for (int i = 1; i <= 2; i++)
+ {
+ ClientConsumer cons = session.createConsumer("q" + i);
+
+ for (int j = 0; j < 3; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str-" + j, msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ }
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory csf = locator.createSessionFactory();
+
+ ClientSession session = csf.createSession();
+
+ session.start();
+
+ for (int i = 1; i <= 2; i++)
+ {
+ ClientConsumer cons = session.createConsumer("q" + i);
+
+ for (int j = 3; j < 6; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str-" + j, msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+ assertNull(cons.receive(500));
+
+ }
+
+ session.close();
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ locator.close();
+
+ server.stop();
+ }
+
public void testTwoQueuesOneNoRouting() throws Exception
{
boolean persistentMessages = true;
@@ -2787,8 +3139,7 @@
}
}
}
-
-
+
public void testPageAndDepageRapidly() throws Exception
{
boolean persistentMessages = true;
@@ -2800,11 +3151,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true,
- config,
- 512 * 1024,
- 1024 * 1024,
- new HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
server.start();
@@ -2827,9 +3174,9 @@
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
+
final AtomicInteger errors = new AtomicInteger(0);
-
+
Thread consumeThread = new Thread()
{
public void run()
@@ -2839,16 +3186,16 @@
{
sessionConsumer = sf.createSession(false, false);
sessionConsumer.start();
-
+
ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
System.out.println("Message " + i + " consumed");
assertNotNull(msg);
msg.acknowledge();
-
+
if (i % 20 == 0)
{
System.out.println("Commit consumer");
@@ -2874,10 +3221,10 @@
errors.incrementAndGet();
}
}
-
+
}
};
-
+
consumeThread.start();
ClientMessage message = null;
@@ -2887,7 +3234,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(persistentMessages);
-
+
System.out.println("Message " + i + " sent");
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -2897,23 +3244,24 @@
message.putIntProperty(new SimpleString("id"), i);
producer.send(message);
-
+
Thread.sleep(50);
}
-
consumeThread.join();
-
+
long timeout = System.currentTimeMillis() + 5000;
-
- while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+
+ while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getNumberOfPages() != 1))
{
Thread.sleep(1);
}
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
+
assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
sf.close();
@@ -2933,7 +3281,6 @@
}
-
public void testTwoQueuesDifferentFilters() throws Exception
{
boolean persistentMessages = true;
@@ -2959,7 +3306,7 @@
try
{
ServerLocator locator = createInVMNonHALocator();
-
+
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -2971,14 +3318,16 @@
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
-
+
// note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES
int NQUEUES = 2;
-
- for (int i = 0 ; i < NQUEUES; i++)
+ for (int i = 0; i < NQUEUES; i++)
{
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true);
+ session.createQueue(PagingTest.ADDRESS,
+ PagingTest.ADDRESS.concat("=" + i),
+ new SimpleString("propTest=" + i),
+ true);
}
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -3012,20 +3361,20 @@
for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
{
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
-
- for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+
+ for (int i = 0; i < (numberOfMessages / NQUEUES); i++)
{
message = consumer.receive(500000);
assertNotNull(message);
message.acknowledge();
-
+
assertEquals(nqueue, message.getIntProperty("propTest").intValue());
}
-
+
assertNull(consumer.receiveImmediate());
-
+
consumer.close();
-
+
session.commit();
}
@@ -3038,7 +3387,6 @@
Thread.sleep(100);
}
-
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
@@ -3056,11 +3404,8 @@
{
}
}
-
}
-
-
public void testTwoQueues() throws Exception
{
boolean persistentMessages = true;
@@ -3086,7 +3431,7 @@
try
{
ServerLocator locator = createInVMNonHALocator();
-
+
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -3098,7 +3443,6 @@
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
-
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
@@ -3133,22 +3477,22 @@
for (int msg = 1; msg <= 2; msg++)
{
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
-
+
for (int i = 0; i < numberOfMessages; i++)
{
message = consumer.receive(500000);
assertNotNull(message);
message.acknowledge();
-
- //assertEquals(msg, message.getIntProperty("propTest").intValue());
-
+
+ // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
}
-
+
session.commit();
-
+
assertNull(consumer.receiveImmediate());
-
+
consumer.close();
}
@@ -3162,10 +3506,9 @@
}
store.getCursorProvier().cleanup();
-
+
Thread.sleep(1000);
-
-
+
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
@@ -3183,11 +3526,221 @@
{
}
}
-
}
+ public void testDLAOnLargeMessageAndPaging() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+ config.setJournalSyncNonTransactional(false);
+
+ Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+ AddressSettings dla = new AddressSettings();
+ dla.setMaxDeliveryAttempts(5);
+ dla.setDeadLetterAddress(new SimpleString("DLA"));
+ settings.put(ADDRESS.toString(), dla);
+
+ final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.createQueue("DLA", "DLA");
+
+ PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+ pgStoreAddress.startPaging();
+ PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 500; i++)
+ {
+ log.info("send message #" + i);
+ message = session.createMessage(true);
+
+ message.putStringProperty("id", "str" + i);
+
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ producer.send(message);
+
+ if ((i + 1) % 2 == 0)
+ {
+ session.commit();
+ if (i < 400)
+ {
+ pgStoreAddress.forceAnotherPage();
+ }
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = null;
+
+ for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ {
+ for (int i = 0 ; i < 5; i++)
+ {
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
+ }
+
+ session.rollback();
+ }
+
+ pgStoreDLA.startPaging();
+ }
+
+ for (int i = 2; i < 500; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ session.start();
+
+ cons = session.createConsumer(ADDRESS);
+
+ for (int i = 2; i < 500; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+ }
+
+ cons.close();
+
+ cons = session.createConsumer("DLA");
+
+ for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ {
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
+ msg.acknowledge();
+ }
+
+ cons.close();
+
+ cons = session.createConsumer(ADDRESS);
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+
+ pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+
+ pgStoreAddress.getCursorProvier().cleanup();
+
+ while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+ {
+ Thread.sleep(50);
+ }
+
+ assertFalse(pgStoreAddress.isPaging());
+
+ session.commit();
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -97,7 +97,7 @@
PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
server.getStorageManager(),
- server.getExecutorFactory(),
+ server.getExecutorFactory().getExecutor(),
5);
for (int i = 0; i < numberOfPages; i++)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-05-13 04:10:25 UTC (rev 10649)
@@ -674,6 +674,15 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#copyTo(org.hornetq.core.journal.SequentialFile)
+ */
+ public void copyTo(SequentialFile newFileName)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
/* (non-Javadoc)
More information about the hornetq-commits
mailing list