Author: clebert.suconic(a)jboss.com
Date: 2010-11-01 22:18:28 -0400 (Mon, 01 Nov 2010)
New Revision: 9831
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
tweaks
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -49,16 +49,10 @@
* @param queueId The cursorID should be the same as the queueId associated for
persistance
* @return
*/
- PageSubscription getPersistentCursor(long queueId);
+ PageSubscription getSubscription(long queueId);
- PageSubscription createPersistentSubscription(long queueId, Filter filter);
+ PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- /**
- * Create a non persistent cursor, usually associated with browsing
- * @return
- */
- PageSubscription createNonPersistentSubscription(Filter filter);
-
Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition
pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -29,56 +29,58 @@
{
// Cursor query operations --------------------------------------
-
+
// To be called before the server is down
void stop();
-
+
void bookmark(PagePosition position) throws Exception;
-
- /** It will be 0 if non persistent cursor */
- public long getId();
-
+
+ long getId();
+
+ boolean isPersistent();
+
public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
-
+
// To be called when the cursor is closed for good. Most likely when the queue is
deleted
void close() throws Exception;
-
+
void scheduleCleanupCheck();
-
+
void cleanupEntries() throws Exception;
-
+
void disableAutoCleanup();
-
+
void enableAutoCleanup();
void ack(PagePosition position) throws Exception;
void ackTx(Transaction tx, PagePosition position) throws Exception;
+
/**
*
* @return the first page in use or MAX_LONG if none is in use
*/
long getFirstPage();
-
+
// Reload operations
-
+
/**
* @param position
*/
void reloadACK(PagePosition position);
-
+
/**
* To be called when the cursor decided to ignore a position.
* @param position
*/
void positionIgnored(PagePosition position);
-
+
/**
* To be used to avoid a redelivery of a prepared ACK after load
* @param position
*/
void reloadPreparedACK(Transaction tx, PagePosition position);
-
+
void processReload() throws Exception;
/**
@@ -86,7 +88,7 @@
* @param position
*/
void redeliver(PagePosition position);
-
+
void printDebug();
/**
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -72,8 +72,6 @@
private ConcurrentMap<Long, PageSubscription> activeCursors = new
ConcurrentHashMap<Long, PageSubscription>();
- private ConcurrentSet<PageSubscription> nonPersistentCursors = new
ConcurrentHashSet<PageSubscription>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -96,7 +94,7 @@
return pagingStore;
}
- public synchronized PageSubscription createPersistentSubscription(long cursorID,
Filter filter)
+ public synchronized PageSubscription createSubscription(long cursorID, Filter filter,
boolean persistent)
{
PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
@@ -109,7 +107,8 @@
storageManager,
executorFactory.getExecutor(),
filter,
- cursorID);
+ cursorID,
+ persistent);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -117,26 +116,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public synchronized PageSubscription getPersistentCursor(long cursorID)
+ public synchronized PageSubscription getSubscription(long cursorID)
{
return activeCursors.get(cursorID);
}
- /**
- * this will create a non-persistent cursor
- */
- public synchronized PageSubscription createNonPersistentSubscription(Filter filter)
- {
- PageSubscription cursor = new PageSubscriptionImpl(this,
- pagingStore,
- storageManager,
- executorFactory.getExecutor(),
- filter,
- 0);
- nonPersistentCursors.add(cursor);
- return cursor;
- }
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -276,11 +260,6 @@
cursor.stop();
}
- for (PageSubscription cursor : nonPersistentCursors)
- {
- cursor.stop();
- }
-
Future future = new Future();
executor.execute(future);
@@ -299,11 +278,6 @@
cursor.flushExecutors();
}
- for (PageSubscription cursor : nonPersistentCursors)
- {
- cursor.flushExecutors();
- }
-
Future future = new Future();
executor.execute(future);
@@ -317,14 +291,7 @@
public void close(PageSubscription cursor)
{
- if (cursor.getId() != 0)
- {
- activeCursors.remove(cursor.getId());
- }
- else
- {
- nonPersistentCursors.remove(cursor);
- }
+ activeCursors.remove(cursor.getId());
scheduleCleanup();
}
@@ -361,7 +328,6 @@
ArrayList<PageSubscription> cursorList = new
ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
- cursorList.addAll(nonPersistentCursors);
long minPage = checkMinPage(cursorList);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -76,6 +76,8 @@
private final StorageManager store;
private final long cursorId;
+
+ private final boolean persistent;
private final Filter filter;
@@ -105,7 +107,8 @@
final StorageManager store,
final Executor executor,
final Filter filter,
- final long cursorId)
+ final long cursorId,
+ final boolean persistent)
{
this.pageStore = pageStore;
this.store = store;
@@ -113,6 +116,7 @@
this.cursorId = cursorId;
this.executor = executor;
this.filter = filter;
+ this.persistent = persistent;
}
// Public --------------------------------------------------------
@@ -317,7 +321,7 @@
{
// if we are dealing with a persistent cursor
- if (cursorId != 0)
+ if (persistent)
{
store.storeCursorAcknowledge(cursorId, position);
}
@@ -339,7 +343,7 @@
public void ackTx(final Transaction tx, final PagePosition position) throws Exception
{
// if the cursor is persistent
- if (cursorId != 0)
+ if (persistent)
{
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
}
@@ -490,6 +494,11 @@
{
return cursorId;
}
+
+ public boolean isPersistent()
+ {
+ return persistent;
+ }
public void processReload() throws Exception
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -1028,7 +1028,7 @@
{
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
- PageSubscription cursor =
store.getCursorProvier().getPersistentCursor(encoding.queueID);
+ PageSubscription cursor =
store.getCursorProvier().getSubscription(encoding.queueID);
cursor.reloadACK(encoding.position);
}
else
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java 2010-11-02
00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.transaction.Transaction;
/**
@@ -38,6 +39,8 @@
long getID();
Filter getFilter();
+
+ PageSubscription getPageSubscription();
boolean isDurable();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.postoffice.PostOffice;
/**
@@ -33,6 +34,7 @@
final SimpleString address,
SimpleString name,
Filter filter,
+ PageSubscription pageSubscription,
boolean durable,
boolean temporary);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -61,6 +61,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
@@ -712,6 +713,8 @@
}
Queue queue = (Queue)binding.getBindable();
+
+ queue.getPageSubscription().close();
if (queue.getConsumerCount() != 0)
{
@@ -1198,10 +1201,13 @@
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+ PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(),
filter, true);
+
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
filter,
+ subscription,
true,
false);
@@ -1214,7 +1220,7 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
-
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentSubscription(queue.getID(),
filter);
+
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1336,11 +1342,16 @@
}
Filter filter = FilterImpl.createFilter(filterString);
+
+ long queueID = storageManager.generateUniqueID();
- final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
+ PageSubscription pageSubscription =
pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter,
durable);
+
+ final Queue queue = queueFactory.createQueue(queueID,
address,
queueName,
filter,
+ pageSubscription,
durable,
temporary);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
@@ -49,6 +50,7 @@
final SimpleString address,
final SimpleString name,
final Filter filter,
+ final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary,
final ScheduledExecutorService scheduledExecutor,
@@ -61,6 +63,7 @@
address,
name,
filter,
+ pageSubscription,
durable,
temporary,
scheduledExecutor,
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Queue;
@@ -69,6 +70,7 @@
final SimpleString address,
final SimpleString name,
final Filter filter,
+ final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary)
{
@@ -81,6 +83,7 @@
address,
name,
filter,
+ pageSubscription,
durable,
temporary,
scheduledExecutor,
@@ -95,6 +98,7 @@
address,
name,
filter,
+ pageSubscription,
durable,
temporary,
scheduledExecutor,
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -89,6 +90,8 @@
private final boolean temporary;
private final PostOffice postOffice;
+
+ private final PageSubscription pageSubscription;
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new
ConcurrentLinkedQueue<MessageReference>();
@@ -141,11 +144,39 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
+
+ public QueueImpl(final long id,
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final boolean durable,
+ final boolean temporary,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ final Executor executor)
+ {
+ this(id,
+ address,
+ name,
+ filter,
+ null,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
+ }
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
final Filter filter,
+ final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary,
final ScheduledExecutorService scheduledExecutor,
@@ -161,6 +192,8 @@
this.name = name;
this.filter = filter;
+
+ this.pageSubscription = pageSubscription;
this.durable = durable;
@@ -244,6 +277,11 @@
return id;
}
+ public PageSubscription getPageSubscription()
+ {
+ return pageSubscription;
+ }
+
public Filter getFilter()
{
return filter;
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -65,6 +65,7 @@
new SimpleString("address1"),
new SimpleString("queue1"),
null,
+ null,
false,
false);
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -117,7 +117,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageSubscription cursor = createNonPersistentCursor();
+ PageSubscription cursor =
lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
Pair<PagePosition, PagedMessage> msg;
@@ -202,6 +202,8 @@
}
});
+
+ queue.getPageSubscription().close();
Pair<PagePosition, PagedMessage> msg;
@@ -268,15 +270,10 @@
PageCursorProvider cursorProvider = lookupCursorProvider();
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentSubscription(queue.getID(), null);
+ .getSubscription(queue.getID());
PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager()
.getPageStore(ADDRESS)
@@ -322,7 +319,7 @@
cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .getPersistentCursor(queue.getID());
+ .getSubscription(queue.getID());
iterator = cursor.iterator();
@@ -373,7 +370,7 @@
PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentSubscription(queue.getID(), null);
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
@@ -396,7 +393,7 @@
cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .getPersistentCursor(queue.getID());
+ .getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
@@ -439,7 +436,7 @@
PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentSubscription(queue.getID(), null);
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -468,7 +465,7 @@
cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .getPersistentCursor(queue.getID());
+ .getSubscription(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
iterator = cursor.iterator();
@@ -517,7 +514,7 @@
PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentSubscription(queue.getID(), null);
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -557,7 +554,7 @@
cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .getPersistentCursor(queue.getID());
+ .getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 2; i++)
@@ -596,7 +593,7 @@
cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .getPersistentCursor(queue.getID());
+ .getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 3; i++)
@@ -687,7 +684,7 @@
PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentSubscription(queue.getID(), null);
+ .getSubscription(queue.getID());
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -760,8 +757,10 @@
PageCursorProvider cursorProvider = lookupCursorProvider();
- PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
- PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createNonPersistentSubscription(null);
+ PageSubscription cursor = cursorProvider.createSubscription(1, null, false);
+ PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createSubscription(2, null, false);
+
+ queue.getPageSubscription().close();
Pair<PagePosition, PagedMessage> msg;
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
@@ -832,7 +831,10 @@
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
+ PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+
+ queue.getPageSubscription().close();
+
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -880,7 +882,7 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageSubscription cursor =
cursorProvider.createPersistentSubscription(queue.getID(), null);
+ PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -908,7 +910,7 @@
createServer();
cursorProvider = lookupCursorProvider();
- cursor = cursorProvider.getPersistentCursor(queue.getID());
+ cursor = cursorProvider.getSubscription(queue.getID());
key = initialKey;
iterator = cursor.iterator();
while ((msgCursor = iterator.next()) != null)
@@ -1015,7 +1017,7 @@
*/
private PageSubscription createNonPersistentCursor() throws Exception
{
- return lookupCursorProvider().createNonPersistentSubscription(null);
+ return
lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(),
null, false);
}
/**
@@ -1024,7 +1026,7 @@
*/
private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
{
- return lookupCursorProvider().createNonPersistentSubscription(filter);
+ return
lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(),
filter, false);
}
/**
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -70,6 +70,7 @@
new SimpleString("address1"),
new SimpleString("queue1"),
null,
+ null,
false,
true,
scheduledExecutor,
@@ -145,6 +146,7 @@
new SimpleString("address1"),
new SimpleString("queue1"),
null,
+ null,
false,
true,
scheduledExecutor,
@@ -253,6 +255,7 @@
new SimpleString("address1"),
QueueImplTest.queue1,
null,
+ null,
false,
true,
scheduledExecutor,
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -591,4 +592,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getPageSubscription()
+ */
+ public PageSubscription getPageSubscription()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-11-02
00:38:52 UTC (rev 9830)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-11-02
02:18:28 UTC (rev 9831)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
@@ -43,6 +44,7 @@
final SimpleString address,
final SimpleString name,
final Filter filter,
+ final PageSubscription subscription,
final boolean durable,
final boolean temporary)
{
@@ -50,6 +52,7 @@
address,
name,
filter,
+ subscription,
durable,
temporary,
scheduledExecutor,