[hornetq-commits] JBoss hornetq SVN: r9831 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 1 22:18:29 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-01 22:18:28 -0400 (Mon, 01 Nov 2010)
New Revision: 9831

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
tweaks

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -49,16 +49,10 @@
     * @param queueId The cursorID should be the same as the queueId associated for persistance
     * @return
     */
-   PageSubscription getPersistentCursor(long queueId);
+   PageSubscription getSubscription(long queueId);
    
-   PageSubscription createPersistentSubscription(long queueId, Filter filter);
+   PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
    
-   /**
-    * Create a non persistent cursor, usually associated with browsing
-    * @return
-    */
-   PageSubscription createNonPersistentSubscription(Filter filter);
-
    Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition pos) throws Exception;
    
    PagedMessage getMessage(PagePosition pos) throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -29,56 +29,58 @@
 {
 
    // Cursor query operations --------------------------------------
-   
+
    // To be called before the server is down
    void stop();
-   
+
    void bookmark(PagePosition position) throws Exception;
-   
-   /** It will be 0 if non persistent cursor */
-   public long getId();
-   
+
+   long getId();
+
+   boolean isPersistent();
+
    public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
-   
+
    // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void close() throws Exception;
-   
+
    void scheduleCleanupCheck();
-   
+
    void cleanupEntries() throws Exception;
-   
+
    void disableAutoCleanup();
-   
+
    void enableAutoCleanup();
 
    void ack(PagePosition position) throws Exception;
 
    void ackTx(Transaction tx, PagePosition position) throws Exception;
+
    /**
     * 
     * @return the first page in use or MAX_LONG if none is in use
     */
    long getFirstPage();
-   
+
    // Reload operations
-   
+
    /**
     * @param position
     */
    void reloadACK(PagePosition position);
-   
+
    /**
     * To be called when the cursor decided to ignore a position.
     * @param position
     */
    void positionIgnored(PagePosition position);
-   
+
    /**
     * To be used to avoid a redelivery of a prepared ACK after load
     * @param position
     */
    void reloadPreparedACK(Transaction tx, PagePosition position);
-   
+
    void processReload() throws Exception;
 
    /**
@@ -86,7 +88,7 @@
     * @param position
     */
    void redeliver(PagePosition position);
-   
+
    void printDebug();
 
    /**

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -72,8 +72,6 @@
 
    private ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
 
-   private ConcurrentSet<PageSubscription> nonPersistentCursors = new ConcurrentHashSet<PageSubscription>();
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -96,7 +94,7 @@
       return pagingStore;
    }
 
-   public synchronized PageSubscription createPersistentSubscription(long cursorID, Filter filter)
+   public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
    {
       PageSubscription activeCursor = activeCursors.get(cursorID);
       if (activeCursor != null)
@@ -109,7 +107,8 @@
                                         storageManager,
                                         executorFactory.getExecutor(),
                                         filter,
-                                        cursorID);
+                                        cursorID,
+                                        persistent);
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
@@ -117,26 +116,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
-   public synchronized PageSubscription getPersistentCursor(long cursorID)
+   public synchronized PageSubscription getSubscription(long cursorID)
    {
       return activeCursors.get(cursorID);
    }
 
-   /**
-    * this will create a non-persistent cursor
-    */
-   public synchronized PageSubscription createNonPersistentSubscription(Filter filter)
-   {
-      PageSubscription cursor = new PageSubscriptionImpl(this,
-                                             pagingStore,
-                                             storageManager,
-                                             executorFactory.getExecutor(),
-                                             filter,
-                                             0);
-      nonPersistentCursors.add(cursor);
-      return cursor;
-   }
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
@@ -276,11 +260,6 @@
          cursor.stop();
       }
 
-      for (PageSubscription cursor : nonPersistentCursors)
-      {
-         cursor.stop();
-      }
-
       Future future = new Future();
 
       executor.execute(future);
@@ -299,11 +278,6 @@
          cursor.flushExecutors();
       }
 
-      for (PageSubscription cursor : nonPersistentCursors)
-      {
-         cursor.flushExecutors();
-      }
-
       Future future = new Future();
 
       executor.execute(future);
@@ -317,14 +291,7 @@
 
    public void close(PageSubscription cursor)
    {
-      if (cursor.getId() != 0)
-      {
-         activeCursors.remove(cursor.getId());
-      }
-      else
-      {
-         nonPersistentCursors.remove(cursor);
-      }
+      activeCursors.remove(cursor.getId());
 
       scheduleCleanup();
    }
@@ -361,7 +328,6 @@
 
             ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
             cursorList.addAll(activeCursors.values());
-            cursorList.addAll(nonPersistentCursors);
 
             long minPage = checkMinPage(cursorList);
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -76,6 +76,8 @@
    private final StorageManager store;
 
    private final long cursorId;
+   
+   private final boolean persistent;
 
    private final Filter filter;
 
@@ -105,7 +107,8 @@
                          final StorageManager store,
                          final Executor executor,
                          final Filter filter,
-                         final long cursorId)
+                         final long cursorId,
+                         final boolean persistent)
    {
       this.pageStore = pageStore;
       this.store = store;
@@ -113,6 +116,7 @@
       this.cursorId = cursorId;
       this.executor = executor;
       this.filter = filter;
+      this.persistent = persistent;
    }
 
    // Public --------------------------------------------------------
@@ -317,7 +321,7 @@
    {
 
       // if we are dealing with a persistent cursor
-      if (cursorId != 0)
+      if (persistent)
       {
          store.storeCursorAcknowledge(cursorId, position);
       }
@@ -339,7 +343,7 @@
    public void ackTx(final Transaction tx, final PagePosition position) throws Exception
    {
       // if the cursor is persistent
-      if (cursorId != 0)
+      if (persistent)
       {
          store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
       }
@@ -490,6 +494,11 @@
    {
       return cursorId;
    }
+   
+   public boolean isPersistent()
+   {
+      return persistent;
+   }
 
    public void processReload() throws Exception
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -1028,7 +1028,7 @@
                {
                   SimpleString address = queueInfo.getAddress();
                   PagingStore store = pagingManager.getPageStore(address);
-                  PageSubscription cursor = store.getCursorProvier().getPersistentCursor(encoding.queueID);
+                  PageSubscription cursor = store.getCursorProvier().getSubscription(encoding.queueID);
                   cursor.reloadACK(encoding.position);
                }
                else

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/Queue.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -20,6 +20,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.transaction.Transaction;
 
 /**
@@ -38,6 +39,8 @@
    long getID();
 
    Filter getFilter();
+   
+   PageSubscription getPageSubscription();
 
    boolean isDurable();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/QueueFactory.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -15,6 +15,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.postoffice.PostOffice;
 
 /**
@@ -33,6 +34,7 @@
                      final SimpleString address,
                      SimpleString name,
                      Filter filter,
+                     PageSubscription pageSubscription,
                      boolean durable,
                      boolean temporary);
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -61,6 +61,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.GroupingInfo;
@@ -712,6 +713,8 @@
       }
 
       Queue queue = (Queue)binding.getBindable();
+      
+      queue.getPageSubscription().close();
 
       if (queue.getConsumerCount() != 0)
       {
@@ -1198,10 +1201,13 @@
          
          Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
 
+         PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+         
          Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
                                                 queueBindingInfo.getAddress(),
                                                 queueBindingInfo.getQueueName(),
                                                 filter,
+                                                subscription,
                                                 true,
                                                 false);
 
@@ -1214,7 +1220,7 @@
          managementService.registerAddress(queueBindingInfo.getAddress());
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
          
-         pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentSubscription(queue.getID(), filter);
+         
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)
@@ -1336,11 +1342,16 @@
       }
 
       Filter filter = FilterImpl.createFilter(filterString);
+      
+      long queueID = storageManager.generateUniqueID();
 
-      final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
+      PageSubscription pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+
+      final Queue queue = queueFactory.createQueue(queueID,
                                                    address,
                                                    queueName,
                                                    filter,
+                                                   pageSubscription,
                                                    durable,
                                                    temporary);
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.MessageReference;
@@ -49,6 +50,7 @@
                          final SimpleString address,
                          final SimpleString name,
                          final Filter filter,
+                         final PageSubscription pageSubscription,
                          final boolean durable,
                          final boolean temporary,
                          final ScheduledExecutorService scheduledExecutor,
@@ -61,6 +63,7 @@
             address,
             name,
             filter,
+            pageSubscription,
             durable,
             temporary,
             scheduledExecutor,

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -17,6 +17,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Queue;
@@ -69,6 +70,7 @@
                             final SimpleString address,
                             final SimpleString name,
                             final Filter filter,
+                            final PageSubscription pageSubscription,
                             final boolean durable,
                             final boolean temporary)
    {
@@ -81,6 +83,7 @@
                                     address,
                                     name,
                                     filter,
+                                    pageSubscription,
                                     durable,
                                     temporary,
                                     scheduledExecutor,
@@ -95,6 +98,7 @@
                                address,
                                name,
                                filter,
+                               pageSubscription,
                                durable,
                                temporary,
                                scheduledExecutor,

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -32,6 +32,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -89,6 +90,8 @@
    private final boolean temporary;
 
    private final PostOffice postOffice;
+   
+   private final PageSubscription pageSubscription;
 
    private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
 
@@ -141,11 +144,39 @@
    private volatile boolean checkDirect;
 
    private volatile boolean directDeliver = true;
+   
+   public QueueImpl(final long id,
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final boolean durable,
+                    final boolean temporary,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final Executor executor)
+   {
+      this(id,
+          address,
+          name,
+          filter,
+          null,
+          durable,
+          temporary,
+          scheduledExecutor,
+          postOffice,
+          storageManager,
+          addressSettingsRepository,
+          executor);
+   }
 
+
    public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
                     final Filter filter,
+                    final PageSubscription pageSubscription,
                     final boolean durable,
                     final boolean temporary,
                     final ScheduledExecutorService scheduledExecutor,
@@ -161,6 +192,8 @@
       this.name = name;
 
       this.filter = filter;
+      
+      this.pageSubscription = pageSubscription;
 
       this.durable = durable;
 
@@ -244,6 +277,11 @@
       return id;
    }
 
+   public PageSubscription getPageSubscription()
+   {
+      return pageSubscription;
+   }
+   
    public Filter getFilter()
    {
       return filter;

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -65,6 +65,7 @@
                                              new SimpleString("address1"),
                                              new SimpleString("queue1"),
                                              null,
+                                             null,
                                              false,
                                              false);
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -117,7 +117,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageSubscription cursor = createNonPersistentCursor();
+      PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
 
       Pair<PagePosition, PagedMessage> msg;
 
@@ -202,6 +202,8 @@
          }
 
       });
+      
+      queue.getPageSubscription().close();
 
       Pair<PagePosition, PagedMessage> msg;
 
@@ -268,15 +270,10 @@
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentSubscription(queue.getID(), null);
+                                     .getSubscription(queue.getID());
 
       PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
                                                                                    .getPageStore(ADDRESS)
@@ -322,7 +319,7 @@
       cursor = this.server.getPagingManager()
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
-                          .getPersistentCursor(queue.getID());
+                          .getSubscription(queue.getID());
 
       iterator = cursor.iterator();
       
@@ -373,7 +370,7 @@
       PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentSubscription(queue.getID(), null);
+                                     .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
@@ -396,7 +393,7 @@
       cursor = this.server.getPagingManager()
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
-                          .getPersistentCursor(queue.getID());
+                          .getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 10; i <= 20; i++)
@@ -439,7 +436,7 @@
       PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentSubscription(queue.getID(), null);
+                                     .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
 
@@ -468,7 +465,7 @@
       cursor = this.server.getPagingManager()
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
-                          .getPersistentCursor(queue.getID());
+                          .getSubscription(queue.getID());
 
       tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
       iterator = cursor.iterator();
@@ -517,7 +514,7 @@
       PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentSubscription(queue.getID(), null);
+                                     .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
 
@@ -557,7 +554,7 @@
       cursor = this.server.getPagingManager()
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
-                          .getPersistentCursor(queue.getID());
+                          .getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 2; i++)
@@ -596,7 +593,7 @@
       cursor = this.server.getPagingManager()
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
-                          .getPersistentCursor(queue.getID());
+                          .getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 3; i++)
@@ -687,7 +684,7 @@
       PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentSubscription(queue.getID(), null);
+                                     .getSubscription(queue.getID());
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
@@ -760,8 +757,10 @@
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
-      PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
-      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createNonPersistentSubscription(null);
+      PageSubscription cursor = cursorProvider.createSubscription(1, null, false);
+      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(2, null, false);
+      
+      queue.getPageSubscription().close();
 
       Pair<PagePosition, PagedMessage> msg;
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
@@ -832,7 +831,10 @@
 
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
+      PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+      
+      queue.getPageSubscription().close();
+      
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -880,7 +882,7 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageSubscription cursor = cursorProvider.createPersistentSubscription(queue.getID(), null);
+      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -908,7 +910,7 @@
       createServer();
 
       cursorProvider = lookupCursorProvider();
-      cursor = cursorProvider.getPersistentCursor(queue.getID());
+      cursor = cursorProvider.getSubscription(queue.getID());
       key = initialKey;
       iterator = cursor.iterator();
       while ((msgCursor = iterator.next()) != null)
@@ -1015,7 +1017,7 @@
     */
    private PageSubscription createNonPersistentCursor() throws Exception
    {
-      return lookupCursorProvider().createNonPersistentSubscription(null);
+      return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), null, false);
    }
 
    /**
@@ -1024,7 +1026,7 @@
     */
    private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
    {
-      return lookupCursorProvider().createNonPersistentSubscription(filter);
+      return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), filter, false);
    }
 
    /**

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -70,6 +70,7 @@
                                   new SimpleString("address1"),
                                   new SimpleString("queue1"),
                                   null,
+                                  null,
                                   false,
                                   true,
                                   scheduledExecutor,
@@ -145,6 +146,7 @@
                                   new SimpleString("address1"),
                                   new SimpleString("queue1"),
                                   null,
+                                  null,
                                   false,
                                   true,
                                   scheduledExecutor,
@@ -253,6 +255,7 @@
                                   new SimpleString("address1"),
                                   QueueImplTest.queue1,
                                   null,
+                                  null,
                                   false,
                                   true,
                                   scheduledExecutor,

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -20,6 +20,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -591,4 +592,13 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#getPageSubscription()
+    */
+   public PageSubscription getPageSubscription()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
 }
\ No newline at end of file

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2010-11-02 00:38:52 UTC (rev 9830)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2010-11-02 02:18:28 UTC (rev 9831)
@@ -19,6 +19,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
@@ -43,6 +44,7 @@
                             final SimpleString address,
                             final SimpleString name,
                             final Filter filter,
+                            final PageSubscription subscription,
                             final boolean durable,
                             final boolean temporary)
    {
@@ -50,6 +52,7 @@
                            address,
                            name,
                            filter,
+                           subscription,
                            durable,
                            temporary,
                            scheduledExecutor,



More information about the hornetq-commits mailing list