[hornetq-commits] JBoss hornetq SVN: r9788 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 14 21:18:34 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-10-14 21:18:33 -0400 (Thu, 14 Oct 2010)
New Revision: 9788
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Treating PageTransactions over the cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -24,8 +24,6 @@
*/
public interface PageTransactionInfo extends EncodingSupport
{
- boolean waitCompletion(int timeoutMilliSeconds) throws Exception;
-
boolean isCommit();
boolean isRollback();
@@ -45,11 +43,9 @@
void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
// To be used after the update was stored or reload
- void update(int update, StorageManager storageManager, PagingManager pagingManager);
+ void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
int getNumberOfMessages();
-
- void markIncomplete();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -28,7 +28,9 @@
*/
public interface PagedMessage extends EncodingSupport
{
- ServerMessage getMessage(StorageManager storageManager);
+ ServerMessage getMessage();
+
+ void initMessage(StorageManager storageManager);
long getTransactionID();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,7 +13,7 @@
package org.hornetq.core.paging.cursor;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
/**
* A LivePageCache
@@ -25,7 +25,5 @@
public interface LivePageCache extends PageCache
{
- void addLiveMessage(ServerMessage message);
-
- void close();
+ void addLiveMessage(PagedMessage message);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,7 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.core.paging.Page;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
/**
* A PageCache
@@ -29,7 +29,7 @@
int getNumberOfMessages();
- void setMessages(ServerMessage[] messages);
+ void setMessages(PagedMessage[] messages);
/**
* If this cache is still being updated
@@ -42,7 +42,7 @@
* @param messageNumber The order of the message on the page
* @return
*/
- ServerMessage getMessage(int messageNumber);
+ PagedMessage getMessage(int messageNumber);
/**
* When the cache is being created,
@@ -54,4 +54,7 @@
* You have to call this method within the same thread you called lock
*/
void unlock();
+
+ void close();
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -31,7 +32,7 @@
void stop();
- Pair<PagePosition, ServerMessage> moveNext() throws Exception;
+ Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.ServerMessage;
@@ -56,9 +57,9 @@
*/
PageCursor createCursor();
- Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+ Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos) throws Exception;
- ServerMessage getMessage(PagePosition pos) throws Exception;
+ PagedMessage getMessage(PagePosition pos) throws Exception;
void processReload() throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,6 +17,7 @@
import java.util.List;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.server.ServerMessage;
@@ -33,7 +34,7 @@
// Attributes ----------------------------------------------------
- private final List<ServerMessage> messages = new LinkedList<ServerMessage>();
+ private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
private final Page page;
@@ -74,10 +75,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#setMessages(org.hornetq.core.server.ServerMessage[])
*/
- public synchronized void setMessages(ServerMessage[] messages)
+ public synchronized void setMessages(PagedMessage[] messages)
{
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
- for (ServerMessage msg : messages)
+ for (PagedMessage msg : messages)
{
addLiveMessage(msg);
}
@@ -86,7 +87,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
*/
- public synchronized ServerMessage getMessage(int messageNumber)
+ public synchronized PagedMessage getMessage(int messageNumber)
{
if (messageNumber < messages.size())
{
@@ -125,7 +126,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.LivePageCache#addLiveMessage(org.hornetq.core.server.ServerMessage)
*/
- public synchronized void addLiveMessage(ServerMessage message)
+ public synchronized void addLiveMessage(PagedMessage message)
{
this.messages.add(message);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,8 +17,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.server.ServerMessage;
/**
* The caching associated to a single page.
@@ -29,28 +29,28 @@
*/
public class PageCacheImpl implements PageCache
{
-
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private ServerMessage[] messages;
-
+
+ private PagedMessage[] messages;
+
private final Page page;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
- public PageCacheImpl(Page page)
+
+ public PageCacheImpl(final Page page)
{
this.page = page;
}
// Public --------------------------------------------------------
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getPage()
*/
@@ -62,7 +62,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
*/
- public ServerMessage getMessage(int messageNumber)
+ public PagedMessage getMessage(final int messageNumber)
{
lock.readLock().lock();
try
@@ -81,22 +81,22 @@
lock.readLock().unlock();
}
}
-
+
public void lock()
{
lock.writeLock().lock();
}
-
+
public void unlock()
{
lock.writeLock().unlock();
}
-
- public void setMessages(ServerMessage[] messages)
+
+ public void setMessages(final PagedMessage[] messages)
{
this.messages = messages;
}
-
+
public int getNumberOfMessages()
{
lock.readLock().lock();
@@ -110,6 +110,10 @@
}
}
+ public void close()
+ {
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#isLive()
*/
@@ -117,13 +121,13 @@
{
return false;
}
-
+
+ @Override
public String toString()
{
return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -109,14 +110,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
+ public synchronized Pair<PagePosition, PagedMessage> moveNext() throws Exception
{
PagePosition redeliveryPos = null;
// Redeliveries will take precedence
if ((redeliveryPos = redeliveries.poll()) != null)
{
- return new Pair<PagePosition, ServerMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
+ return new Pair<PagePosition, PagedMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
}
if (lastPosition == null)
@@ -128,17 +129,17 @@
boolean match = false;
- Pair<PagePosition, ServerMessage> message = null;
+ Pair<PagePosition, PagedMessage> message = null;
do
{
- message = cursorProvider.getAfter(lastPosition);
+ message = cursorProvider.getAfter(this, lastPosition);
if (message != null)
{
lastPosition = message.a;
- match = match(message.b);
+ match = match(message.b.getMessage());
if (!match)
{
@@ -246,7 +247,7 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+ Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getAfter(this, tmpPos);
positions = getPageInfo(tmpPos);
@@ -258,7 +259,7 @@
}
else
{
- if (match(msgCheck.b))
+ if (match(msgCheck.b.getMessage()))
{
redeliver(msgCheck.a);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,8 +106,21 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
+ public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final PagePosition pos) throws Exception
{
+
+ while(true)
+ {
+ Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+
+
+
+ return retPos;
+ }
+ }
+
+ private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+ {
// TODO: consider page transactions here to avoid receiving an uncommitted message
// TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
@@ -131,11 +144,11 @@
}
}
- ServerMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+ PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
if (serverMessage != null)
{
- return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+ return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
}
else
{
@@ -143,7 +156,7 @@
}
}
- public ServerMessage getMessage(final PagePosition pos) throws Exception
+ public PagedMessage getMessage(final PagePosition pos) throws Exception
{
PageCache cache = getPageCache(pos);
@@ -257,16 +270,13 @@
List<PagedMessage> pgdMessages = page.read();
- ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-
int i = 0;
for (PagedMessage pdgMessage : pgdMessages)
{
- ServerMessage message = pdgMessage.getMessage(storageManager);
- srvMessages[i++] = message;
+ pdgMessage.initMessage(storageManager);
}
- cache.setMessages(srvMessages);
+ cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
}
finally
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -181,7 +181,7 @@
if (pageCache != null)
{
- pageCache.addLiveMessage(message.getMessage(storageManager));
+ pageCache.addLiveMessage(message);
}
numberOfMessages.incrementAndGet();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,8 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
@@ -43,12 +41,10 @@
private volatile long recordID = -1;
- private volatile CountDownLatch countDownCompleted;
+ private volatile boolean committed = false;
- private volatile boolean committed;
+ private volatile boolean rolledback = false;
- private volatile boolean rolledback;
-
private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -59,7 +55,6 @@
{
this();
this.transactionID = transactionID;
- countDownCompleted = new CountDownLatch(1);
}
public PageTransactionInfoImpl()
@@ -83,7 +78,7 @@
return transactionID;
}
- public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
+ public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager)
{
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null)
@@ -120,7 +115,6 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
- countDownCompleted = null;
committed = true;
}
@@ -135,27 +129,11 @@
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
- public void commit()
+ public synchronized void commit()
{
committed = true;
- /**
- * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
- */
- countDownCompleted.countDown();
}
- public boolean waitCompletion(final int timeoutMilliseconds) throws InterruptedException
- {
- if (countDownCompleted == null)
- {
- return true;
- }
- else
- {
- return countDownCompleted.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
- }
- }
-
public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
{
storageManager.storePageTransaction(tx.getID(), this);
@@ -194,7 +172,7 @@
public void afterCommit(Transaction tx)
{
- pgToUpdate.update(depages, storageManager, pagingManager);
+ pgToUpdate.onUpdate(depages, storageManager, pagingManager);
}
});
}
@@ -209,21 +187,12 @@
return rolledback;
}
- public void rollback()
+ public synchronized void rollback()
{
rolledback = true;
committed = false;
- countDownCompleted.countDown();
}
- public void markIncomplete()
- {
- committed = false;
- rolledback = false;
-
- countDownCompleted = new CountDownLatch(1);
- }
-
public String toString()
{
return "PageTransactionInfoImpl(transactionID=" + transactionID +
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -67,8 +67,13 @@
{
}
- public ServerMessage getMessage(final StorageManager storage)
+ public ServerMessage getMessage()
{
+ return message;
+ }
+
+ public void initMessage(StorageManager storage)
+ {
if (largeMessageLazyData != null)
{
message = storage.createLargeMessage();
@@ -76,7 +81,6 @@
message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
- return message;
}
public long getTransactionID()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -1019,7 +1019,7 @@
for (PagedMessage pagedMessage : pagedMessages)
{
- ServerMessage message = pagedMessage.getMessage(storageManager);
+ ServerMessage message = pagedMessage.getMessage();
if (message.isLargeMessage())
{
@@ -1060,7 +1060,7 @@
// This is to avoid a race condition where messages are depaged
// before the commit arrived
- while (running && !pageUserTransaction.waitCompletion(500))
+ while (running)
{
// This is just to give us a chance to interrupt the process..
// if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -948,7 +948,7 @@
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
- pageTX.update(pageUpdate.recods, null, null);
+ pageTX.onUpdate(pageUpdate.recods, null, null);
}
else
{
@@ -1534,8 +1534,6 @@
pageTransactionInfo.decode(buff);
- pageTransactionInfo.markIncomplete();
-
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
pagingManager.addTransaction(pageTransactionInfo);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -548,7 +548,8 @@
private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception
{
PagedMessage pgdMessage = packet.getPagedMessage();
- ServerMessage msg = pgdMessage.getMessage(storage);
+ pgdMessage.initMessage(storage);
+ ServerMessage msg = pgdMessage.getMessage();
Page page = getPage(msg.getAddress(), packet.getPageNumber());
page.write(pgdMessage);
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -110,12 +111,12 @@
PageCursor cursor = cursorProvider.createCursor();
- Pair<PagePosition, ServerMessage> msg;
+ Pair<PagePosition, PagedMessage> msg;
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
- assertEquals(key++, msg.b.getIntProperty("key").intValue());
+ assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
@@ -169,9 +170,9 @@
for (int i = 0 ; i < 1000 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < firstPageSize)
{
@@ -193,9 +194,9 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
@@ -226,8 +227,8 @@
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 100 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ack(msg.a);
@@ -242,16 +243,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
@@ -276,8 +277,8 @@
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
for (int i = 0 ; i < 100 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ackTx(tx, msg.a);
@@ -296,15 +297,15 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx,msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx,msg.a);
}
@@ -344,13 +345,13 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, ServerMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
assertNotNull(readMessage);
cursor.ack(readMessage.a);
- assertEquals(i, readMessage.b.getIntProperty("key").intValue());
+ assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
assertNull(cursor.moveNext());
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,10 +106,10 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
@@ -178,10 +178,10 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -98,7 +98,7 @@
Assert.assertEquals(1, msgs.size());
UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -290,7 +290,7 @@
for (int i = 0; i < numMessages; i++)
{
HornetQBuffer horn1 = buffers.get(i);
- HornetQBuffer horn2 = msg.get(i).getMessage(null).getBodyBuffer();
+ HornetQBuffer horn2 = msg.get(i).getMessage().getBodyBuffer();
horn1.resetReaderIndex();
horn2.resetReaderIndex();
for (int j = 0; j < horn1.writerIndex(); j++)
@@ -368,9 +368,9 @@
for (int i = 0; i < 5; i++)
{
- Assert.assertEquals(sequence++, msg.get(i).getMessage(null).getMessageID());
+ Assert.assertEquals(sequence++, msg.get(i).getMessage().getMessageID());
UnitTestCase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer());
}
}
@@ -413,9 +413,9 @@
Assert.assertEquals(1, msgs.size());
- Assert.assertEquals(1l, msgs.get(0).getMessage(null).getMessageID());
+ Assert.assertEquals(1l, msgs.get(0).getMessage().getMessageID());
- UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage(null).getBodyBuffer());
+ UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().getBodyBuffer());
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -594,14 +594,14 @@
for (PagedMessage msg : msgs)
{
- long id = msg.getMessage(null).getBodyBuffer().readLong();
- msg.getMessage(null).getBodyBuffer().resetReaderIndex();
+ long id = msg.getMessage().getBodyBuffer().readLong();
+ msg.getMessage().getBodyBuffer().resetReaderIndex();
ServerMessage msgWritten = buffers.remove(id);
- buffers2.put(id, msg.getMessage(null));
+ buffers2.put(id, msg.getMessage());
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
- UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage(null).getBodyBuffer());
+ Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
+ UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage().getBodyBuffer());
}
}
@@ -667,13 +667,13 @@
for (PagedMessage msg : msgs)
{
- long id = msg.getMessage(null).getBodyBuffer().readLong();
+ long id = msg.getMessage().getBodyBuffer().readLong();
ServerMessage msgWritten = buffers2.remove(id);
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
+ Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
msgWritten.getBodyBuffer().toByteBuffer().array(),
- msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
+ msg.getMessage().getBodyBuffer().toByteBuffer().array());
}
}
@@ -682,8 +682,8 @@
lastPage.close();
Assert.assertEquals(1, lastMessages.size());
- lastMessages.get(0).getMessage(null).getBodyBuffer().resetReaderIndex();
- Assert.assertEquals(lastMessages.get(0).getMessage(null).getBodyBuffer().readLong(), lastMessageId);
+ lastMessages.get(0).getMessage().getBodyBuffer().resetReaderIndex();
+ Assert.assertEquals(lastMessages.get(0).getMessage().getBodyBuffer().readLong(), lastMessageId);
Assert.assertEquals(0, buffers2.size());
@@ -796,7 +796,7 @@
for (PagedMessage pgmsg : messages)
{
- ServerMessage msg = pgmsg.getMessage(null);
+ ServerMessage msg = pgmsg.getMessage();
assertEquals(msgsRead++, msg.getMessageID());
More information about the hornetq-commits
mailing list