Author: clebert.suconic(a)jboss.com
Date: 2010-10-04 18:20:52 -0400 (Mon, 04 Oct 2010)
New Revision: 9745
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
recovery of cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -60,6 +61,8 @@
boolean page(ServerMessage message) throws Exception;
Page createPage(final int page) throws Exception;
+
+ PageCursorProvider getCursorProvier();
/**
* @return false if a thread was already started, or if not in page mode
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -30,9 +30,24 @@
PagePosition getFirstPosition();
- void ack(PagePosition position);
+ void ack(PagePosition position) throws Exception;
- void ack(long tx, PagePosition position);
+ void ackTx(long tx, PagePosition position) throws Exception;
+
+ /**
+ * @param position
+ */
+ void recoverACK(PagePosition position);
+
+ /**
+ * To be used to avoid a redelivery of a prepared ACK after load
+ * @param position
+ */
+ void recoverPreparedACK(PagePosition position);
+ /**
+ * To be used on redeliveries
+ * @param position
+ */
void returnElement(PagePosition position);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -46,7 +46,7 @@
* @param queueId The cursorID should be the same as the queueId associated for
persistance
* @return
*/
- PageCursor createCursor(long queueId);
+ PageCursor getCursor(long queueId);
/**
* Create a non persistent cursor, usually associated with browsing
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -35,11 +35,11 @@
// Attributes ----------------------------------------------------
- private StorageManager store;
+ private final StorageManager store;
private final long cursorId;
- private PagingStore pageStore;
+ private final PagingStore pageStore;
private final PageCursorProvider cursorProvider;
@@ -49,7 +49,10 @@
// Constructors --------------------------------------------------
- public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore,
StorageManager store, long cursorId)
+ public PageCursorImpl(final PageCursorProvider cursorProvider,
+ final PagingStore pageStore,
+ final StorageManager store,
+ final long cursorId)
{
this.pageStore = pageStore;
this.store = store;
@@ -86,22 +89,20 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void ack(PagePosition position)
+ public void ack(final PagePosition position) throws Exception
{
- // TODO Auto-generated method stub
+ store.storeCursorAcknowledge(cursorId, position);
}
-
- public void ack(long tx, PagePosition position)
+
+ public void ackTx(final long tx, final PagePosition position) throws Exception
{
-
+ store.storeCursorAcknowledgeTransactional(tx, cursorId, position);
}
-
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void returnElement(PagePosition position)
+ public void returnElement(final PagePosition position)
{
// TODO Auto-generated method stub
@@ -120,7 +121,7 @@
// Protected -----------------------------------------------------
- protected boolean match(ServerMessage message)
+ protected boolean match(final ServerMessage message)
{
return true;
}
@@ -133,6 +134,24 @@
return new PagePositionImpl(firstPage, -1);
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#recoverACK(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void recoverACK(final PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void recoverPreparedACK(final PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.Page;
@@ -26,9 +27,13 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SoftValueHashMap;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
* A PageProviderIMpl
+ *
+ * TODO: this may be moved entirely into PagingStore as there's an one-to-one
relationship here
+ * However I want to keep this isolated as much as possible during development
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -45,6 +50,8 @@
private final StorageManager storageManager;
private SoftValueHashMap<Long, PageCacheImpl> softCache = new
SoftValueHashMap<Long, PageCacheImpl>();
+
+ private ConcurrentMap<Long, PageCursor> activeCursors = new
ConcurrentHashMap<Long, PageCursor>();
// Static --------------------------------------------------------
@@ -66,26 +73,23 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor createCursor(long cursorId)
+ public PageCursor getCursor(long cursorID)
{
- return new PageCursorImpl(this, pagingStore, storageManager, cursorId);
+ PageCursor activeCursor = activeCursors.get(cursorID);
+ if (activeCursor == null)
+ {
+ activeCursor = activeCursors.putIfAbsent(cursorID, new PageCursorImpl(this,
pagingStore, storageManager, cursorID));
+ }
+
+ return activeCursor;
}
-
public PageCursor createCursor()
{
return new PageCursorImpl(this, pagingStore, storageManager, 0);
}
/* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursorProvider#recoverCursor(org.hornetq.core.paging.cursor.PagePosition)
- */
- public PageCursor recoverCursor(final PagePositionImpl position)
- {
- return null;
- }
-
- /* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws
Exception
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -13,9 +13,7 @@
package org.hornetq.core.paging.cursor.impl;
-import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.utils.DataConstants;
/**
* A PagePosition
@@ -50,7 +48,7 @@
*/
public PagePositionImpl()
{
-
+
}
/**
@@ -126,4 +124,38 @@
{
return this.pageNr == pos.getPageNr() && this.getRecordID() -
pos.getRecordID() == 1;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + messageNr;
+ result = prime * result + (int)(pageNr ^ (pageNr >>> 32));
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PagePositionImpl other = (PagePositionImpl)obj;
+ if (messageNr != other.messageNr)
+ return false;
+ if (pageNr != other.pageNr)
+ return false;
+ return true;
+ }
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -40,6 +40,8 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
@@ -48,8 +50,8 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
/**
@@ -114,6 +116,8 @@
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
+
+ private final PageCursorProvider cursorProvider;
/**
* We need to perform checks on currentPage with minimal locking
@@ -187,6 +191,8 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager);
// Post office could be null on the backup node
if (postOffice == null)
@@ -204,6 +210,11 @@
// PagingStore implementation ------------------------------------
+ public PageCursorProvider getCursorProvier()
+ {
+ return cursorProvider;
+ }
+
public long getFirstPage()
{
return firstPageId;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -151,6 +151,7 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo>
queueInfos,
final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -50,6 +50,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -136,7 +138,7 @@
public static final byte HEURISTIC_COMPLETION = 38;
- public static final byte ACKNOWLEDGE_PAGING = 39;
+ public static final byte ACKNOWLEDGE_CURSOR = 39;
private UUID persistentID;
@@ -517,7 +519,7 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_PAGING, new
CursorAckRecordEncoding(queueID, position), syncNonTransactional,
getContext(syncNonTransactional));
+ messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_CURSOR, new
CursorAckRecordEncoding(queueID, position), syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -627,7 +629,7 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_PAGING, new
CursorAckRecordEncoding(queueID, position));
+ messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new
CursorAckRecordEncoding(queueID, position));
}
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws
Exception
@@ -786,6 +788,7 @@
final PagingManager pagingManager,
final ResourceManager
resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo>
queueInfos,
final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1002,6 +1005,29 @@
resourceManager.putHeuristicCompletion(record.id, encoding.xid,
encoding.isCommit);
break;
}
+ case ACKNOWLEDGE_CURSOR:
+ {
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ encoding.position.setRecordID(record.id);
+
+ QueueBindingInfo queueInfo = queueInfos.get(encoding.queueID);
+
+ if (queueInfo != null)
+ {
+ SimpleString address = queueInfo.getAddress();
+ PagingStore store = pagingManager.getPageStore(address);
+ PageCursor cursor =
store.getCursorProvier().getCursor(encoding.queueID);
+ cursor.recoverACK(encoding.position);
+ }
+ else
+ {
+ log.warn("Can't find queue " + queueInfo.getId() + "
while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " +
recordType);
@@ -1536,6 +1562,12 @@
break;
}
+ case ACKNOWLEDGE_CURSOR:
+ {
+ // TODO: implement and test this case
+ // and make sure the rollback will work well also
+ break;
+ }
default:
{
JournalStorageManager.log.warn("InternalError: Record type "
+ recordType +
@@ -2229,12 +2261,8 @@
long scheduledDeliveryTime;
int deliveryCount;
-
- boolean referenced = false;
}
-
-
private static final class CursorAckRecordEncoding implements EncodingSupport
{
public CursorAckRecordEncoding(final long queueID, final PagePosition position)
@@ -2242,6 +2270,11 @@
this.queueID = queueID;
this.position = position;
}
+
+ public CursorAckRecordEncoding()
+ {
+ this.position = new PagePositionImpl();
+ }
long queueID;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -268,6 +268,7 @@
final PagingManager pagingManager,
final ResourceManager
resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo>
queueInfos,
final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
return new JournalLoadInformation();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -61,6 +61,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
@@ -163,7 +164,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -1188,9 +1189,12 @@
setNodeID();
Map<Long, Queue> queues = new HashMap<Long, Queue>();
+ Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<Long,
QueueBindingInfo>();
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
+ queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
+
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
@@ -1226,6 +1230,7 @@
pagingManager,
resourceManager,
queues,
+ queueBindingInfosMap,
duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry :
duplicateIDMap.entrySet())
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -61,6 +61,7 @@
// Public --------------------------------------------------------
+ // Read more cache than what would fit on the memory, and validate if the memory would
be cleared through soft-caches
public void testReadCache() throws Exception
{
@@ -132,6 +133,32 @@
assertNull(cache);
}
+
+
+ public void testRollbackScenarios() throws Exception
+ {
+
+ }
+
+ public void testRedeliveryScenarios() throws Exception
+ {
+
+ }
+
+ public void testCleanupScenarios() throws Exception
+ {
+ // Validate the pages are being cleared (with multiple cursors)
+ }
+
+ public void testLeavePageStateAndRestart() throws Exception
+ {
+ // Validate the cursor are working fine when all the pages are gone, and then
paging being restarted
+ }
+
+ public void testRedeliveryWithCleanup() throws Exception
+ {
+
+ }
/**
* @param numMessages
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -76,7 +76,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
assertEquals(98, deletedMessage.size());
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -101,7 +101,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null);
journal.stop();
@@ -111,7 +111,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -128,7 +128,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
}
/**
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -1154,6 +1154,7 @@
final PagingManager
pagingManager,
final ResourceManager
resourceManager,
final Map<Long, Queue>
queues,
+ Map<Long, QueueBindingInfo>
queueInfos,
final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
return new JournalLoadInformation();
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-04
19:35:39 UTC (rev 9744)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-04
22:20:52 UTC (rev 9745)
@@ -111,6 +111,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(0, mapDups.size());
@@ -132,6 +133,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(1, mapDups.size());
@@ -160,6 +162,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(1, mapDups.size());