[hornetq-commits] JBoss hornetq SVN: r9834 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 2 23:13:24 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-02 23:13:23 -0400 (Tue, 02 Nov 2010)
New Revision: 9834

Added:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
renaming a few classes.. some renames.. etc

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -53,7 +53,7 @@
    
    PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
    
-   Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition pos) throws Exception;
+   PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws Exception;
    
    PagedMessage getMessage(PagePosition pos) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -39,7 +39,7 @@
 
    boolean isPersistent();
 
-   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+   public LinkedListIterator<PagedReferenceImpl> iterator();
 
    // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void close() throws Exception;
@@ -52,8 +52,14 @@
 
    void enableAutoCleanup();
 
-   void ack(PagePosition position) throws Exception;
+   void ack(PagedReference ref) throws Exception;
 
+   // for internal (cursor) classes
+   void ack(PagePosition ref) throws Exception;
+
+   void ackTx(Transaction tx, PagedReference position) throws Exception;
+
+   // for internal (cursor) classes
    void ackTx(Transaction tx, PagePosition position) throws Exception;
 
    /**

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+
+/**
+ * A PagedReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface PagedReference extends MessageReference
+{
+     PagePosition getPosition();
+     
+     PagedMessage getPagedMessage();
+}

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A InternalReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public class PagedReferenceImpl implements PagedReference
+{
+   
+   private static final long serialVersionUID = -8640232251318264710L;
+
+   private PagePosition a;
+   private PagedMessage b;
+   
+   
+   public ServerMessage getMessage()
+   {
+      return b.getMessage();
+   }
+   
+   public PagedMessage getPagedMessage()
+   {
+      return b;
+   }
+   
+   public PagePosition getPosition()
+   {
+      return a;
+   }
+
+   public PagedReferenceImpl(PagePosition a, PagedMessage b)
+   {
+      this.a = a;
+      this.b = b;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
+    */
+   public MessageReference copy(Queue queue)
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#getScheduledDeliveryTime()
+    */
+   public long getScheduledDeliveryTime()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#setScheduledDeliveryTime(long)
+    */
+   public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#getDeliveryCount()
+    */
+   public int getDeliveryCount()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#setDeliveryCount(int)
+    */
+   public void setDeliveryCount(int deliveryCount)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#incrementDeliveryCount()
+    */
+   public void incrementDeliveryCount()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#decrementDeliveryCount()
+    */
+   public void decrementDeliveryCount()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#getQueue()
+    */
+   public Queue getQueue()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#handled()
+    */
+   public void handled()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -14,13 +14,11 @@
 package org.hornetq.core.paging.cursor.impl;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
-import org.hornetq.api.core.Pair;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
@@ -29,12 +27,11 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
@@ -124,12 +121,12 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public Pair<PagePosition, PagedMessage> getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
+   public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
    {
 
       while (true)
       {
-         Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
+         PagedReferenceImpl retPos = internalGetNext(cursorPos);
 
          if (retPos == null)
          {
@@ -137,15 +134,15 @@
          }
          else if (retPos != null)
          {
-            cursorPos = retPos.a;
-            if (retPos.b.getTransactionID() != 0)
+            cursorPos = retPos.getPosition();
+            if (retPos.getPagedMessage().getTransactionID() != 0)
             {
-               PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
+               PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
                if (tx == null)
                {
-                  log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() +
+                  log.warn("Couldn't locate page transaction " + retPos.getPagedMessage().getTransactionID() +
                            ", ignoring message on position " +
-                           retPos.a);
+                           retPos.getPosition());
                   cursor.positionIgnored(cursorPos);
                }
                else
@@ -164,7 +161,7 @@
       }
    }
 
-   private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition pos)
+   private PagedReferenceImpl internalGetNext(final PagePosition pos)
    {
       PagePosition retPos = pos.nextMessage();
 
@@ -191,7 +188,7 @@
 
       if (serverMessage != null)
       {
-         return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+         return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
       }
       else
       {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -17,29 +17,26 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.hornetq.api.core.Pair;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
@@ -158,7 +155,7 @@
       ack(position);
    }
 
-   class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+   class CursorIterator implements LinkedListIterator<PagedReferenceImpl>
    {
       PagePosition position = getLastPosition();
 
@@ -170,7 +167,7 @@
       
       /** next element taken on hasNext test.
        *  it has to be delivered on next next operation */
-      Pair<PagePosition, PagedMessage> cachedNext;
+      PagedReferenceImpl cachedNext;
 
       public void repeat()
       {
@@ -194,12 +191,12 @@
       /* (non-Javadoc)
        * @see java.util.Iterator#next()
        */
-      public Pair<PagePosition, PagedMessage> next()
+      public PagedReferenceImpl next()
       {
          
          if (cachedNext != null)
          {
-            Pair<PagePosition, PagedMessage> retPos = cachedNext;
+            PagedReferenceImpl retPos = cachedNext;
             cachedNext = null;
             return retPos;
          }
@@ -215,10 +212,10 @@
                isredelivery = false;
             }
 
-            Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+            PagedReferenceImpl nextPos = moveNext(position);
             if (nextPos != null)
             {
-               position = nextPos.a;
+               position = nextPos.getPosition();
             }
             return nextPos;
          }
@@ -257,15 +254,15 @@
       }
    }
 
-   private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws Exception
+   private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
    {
-      return new Pair<PagePosition, PagedMessage>(pos, cursorProvider.getMessage(pos));
+      return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
     */
-   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+   public LinkedListIterator<PagedReferenceImpl> iterator()
    {
       return new CursorIterator();
    }
@@ -275,11 +272,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
-   public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition position) throws Exception
+   public synchronized PagedReferenceImpl moveNext(PagePosition position) throws Exception
    {
       boolean match = false;
 
-      Pair<PagePosition, PagedMessage> message = null;
+      PagedReferenceImpl message = null;
 
       PagePosition tmpPosition = position;
 
@@ -294,25 +291,24 @@
          }
          else
          {
-            PageCursorInfo info = getPageInfo(message.a, false);
-            if (info != null && info.isRemoved(message.a))
+            PageCursorInfo info = getPageInfo(message.getPosition(), false);
+            if (info != null && info.isRemoved(message.getPosition()))
             {
-               tmpPosition = message.a;
+               tmpPosition = message.getPosition();
                valid = false;
             }
          }
          if (valid)
          {
-            tmpPosition = message.a;
+            tmpPosition = message.getPosition();
 
-            match = match(message.b.getMessage());
+            match = match(message.getMessage());
 
             if (!match)
             {
-               processACK(message.a);
+               processACK(message.getPosition());
             }
          }
-
       }
       while (message != null && !match);
 
@@ -337,9 +333,13 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
     */
+   public void ack(final PagedReference position) throws Exception
+   {
+      ack(position.getPosition());
+   }
+   
    public void ack(final PagePosition position) throws Exception
    {
-
       // if we are dealing with a persistent cursor
       if (persistent)
       {
@@ -371,6 +371,12 @@
 
    }
 
+
+   public void ackTx(final Transaction tx, final PagedReference position) throws Exception
+   {
+      ackTx(tx, position.getPosition());
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
     */
@@ -556,21 +562,21 @@
                   // looking for holes on the ack list for redelivery
                   while (true)
                   {
-                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
+                     PagedReferenceImpl msgCheck = cursorProvider.getNext(this, tmpPos);
 
                      positions = getPageInfo(tmpPos);
 
                      // end of the hole, we can finish processing here
                      // It may be also that the next was just a next page, so we just ignore it
-                     if (msgCheck == null || msgCheck.a.equals(pos))
+                     if (msgCheck == null || msgCheck.getPosition().equals(pos))
                      {
                         break;
                      }
                      else
                      {
-                        if (match(msgCheck.b.getMessage()))
+                        if (match(msgCheck.getMessage()))
                         {
-                           redeliver(msgCheck.a);
+                           redeliver(msgCheck.getPosition());
                         }
                         else
                         {
@@ -580,7 +586,7 @@
                            positions.confirmed.incrementAndGet();
                         }
                      }
-                     tmpPos = msgCheck.a;
+                     tmpPos = msgCheck.getPosition();
                   }
                }
             }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-03 03:13:23 UTC (rev 9834)
@@ -21,7 +21,6 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
@@ -30,12 +29,13 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -120,14 +120,14 @@
 
       PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
 
-      Pair<PagePosition, PagedMessage> msg;
+      PagedReferenceImpl msg;
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
       int key = 0;
       while ((msg = iterator.next()) != null)
       {
-         assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg.getPosition());
       }
       assertEquals(NUM_MESSAGES, key);
 
@@ -205,30 +205,30 @@
 
       queue.getPageSubscription().close();
 
-      Pair<PagePosition, PagedMessage> msg;
+      PagedReferenceImpl msg;
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven = cursorEven.iterator();
+      LinkedListIterator<PagedReferenceImpl> iteratorEven = cursorEven.iterator();
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd = cursorOdd.iterator();
+      LinkedListIterator<PagedReferenceImpl> iteratorOdd = cursorOdd.iterator();
 
       int key = 0;
       while ((msg = iteratorEven.next()) != null)
       {
          System.out.println("Received" + msg);
-         assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
-         assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+         assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
          key += 2;
-         cursorEven.ack(msg.a);
+         cursorEven.ack(msg.getPosition());
       }
       assertEquals(NUM_MESSAGES, key);
 
       key = 1;
       while ((msg = iteratorOdd.next()) != null)
       {
-         assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
-         assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+         assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
          key += 2;
-         cursorOdd.ack(msg.a);
+         cursorOdd.ack(msg.getPosition());
       }
       assertEquals(NUM_MESSAGES + 1, key);
 
@@ -285,18 +285,18 @@
       System.out.println("Cursor: " + cursor);
       cursorProvider.printDebug();
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
       for (int i = 0; i < 1000; i++)
       {
          System.out.println("Reading Msg : " + i);
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
+         PagedReferenceImpl msg = iterator.next();
          assertNotNull(msg);
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
 
          if (i < firstPageSize)
          {
-            cursor.ack(msg.a);
+            cursor.ack(msg);
          }
       }
       cursorProvider.printDebug();
@@ -319,11 +319,11 @@
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
+         PagedReferenceImpl msg = iterator.next();
          assertNotNull(msg);
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
 
-         cursor.ack(msg.a);
+         cursor.ack(msg);
 
          OperationContextImpl.getContext(null).waitCompletion();
 
@@ -361,14 +361,14 @@
                                            .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
       for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
-            cursor.ack(msg.a);
+            cursor.ack(msg);
          }
       }
 
@@ -383,16 +383,16 @@
 
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
       }
 
       server.stop();
@@ -422,15 +422,15 @@
 
       Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
       for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
-            cursor.ackTx(tx, msg.a);
+            cursor.ackTx(tx, msg);
          }
       }
 
@@ -449,16 +449,16 @@
 
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx, msg.a);
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ackTx(tx, msg);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = iterator.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx, msg.a);
+         PagedReferenceImpl msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ackTx(tx, msg);
       }
 
       tx.commit();
@@ -490,7 +490,7 @@
 
       System.out.println("Cursor: " + cursor);
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
@@ -506,11 +506,11 @@
 
          Assert.assertTrue(pageStore.page(msg));
 
-         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+         PagedReferenceImpl readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
-         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
 
          assertNull(iterator.next());
       }
@@ -544,11 +544,11 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+         PagedReferenceImpl readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
-         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
       }
 
       server.stop();
@@ -580,20 +580,20 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+         PagedReferenceImpl readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
-         cursor.ack(readMessage.a);
+         cursor.ack(readMessage);
 
-         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
       }
 
-      Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+      PagedReferenceImpl readMessage = iterator.next();
 
-      assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
+      assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
 
-      cursor.ack(readMessage.a);
+      cursor.ack(readMessage);
 
       server.getStorageManager().waitOnOperations();
 
@@ -647,7 +647,7 @@
                                            .getPageStore(ADDRESS)
                                            .getCursorProvier()
                                            .getSubscription(queue.getID());
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
 
@@ -676,10 +676,10 @@
       // First consume what's already there without any tx as nothing was committed
       for (int i = 300; i < 400; i++)
       {
-         Pair<PagePosition, PagedMessage> pos = iterator.next();
+         PagedReferenceImpl pos = iterator.next();
          assertNotNull("Null at position " + i, pos);
-         assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos.a);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
       }
 
       assertNull(iterator.next());
@@ -693,10 +693,10 @@
       // Second:after pgtxCommit was done
       for (int i = 200; i < 300; i++)
       {
-         Pair<PagePosition, PagedMessage> pos = iterator.next();
+         PagedReferenceImpl pos = iterator.next();
          assertNotNull(pos);
-         assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos.a);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
       }
 
       assertNull(iterator.next());
@@ -724,15 +724,15 @@
 
       queue.getPageSubscription().close();
 
-      Pair<PagePosition, PagedMessage> msg;
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 = cursor.iterator();
+      PagedReferenceImpl msg;
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator2 = cursor.iterator();
 
       int key = 0;
       while ((msg = iterator.next()) != null)
       {
-         assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
       }
       assertEquals(NUM_MESSAGES, key);
 
@@ -741,7 +741,7 @@
       for (int i = 0; i < 10; i++)
       {
          msg = iterator2.next();
-         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
       }
 
       assertSame(cursor2.getProvider(), cursorProvider);
@@ -803,13 +803,13 @@
       msg = null;
 
       cache = null;
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
-      Pair<PagePosition, PagedMessage> msgCursor = null;
+      PagedReferenceImpl msgCursor = null;
       while ((msgCursor = iterator.next()) != null)
       {
-         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msgCursor.a);
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor);
       }
       assertEquals(NUM_MESSAGES, key);
 
@@ -848,12 +848,12 @@
 
       cache = null;
 
-      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
 
-      Pair<PagePosition, PagedMessage> msgCursor = null;
+      PagedReferenceImpl msgCursor = null;
       while ((msgCursor = iterator.next()) != null)
       {
-         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
       }
       assertEquals(NUM_MESSAGES, key);
 
@@ -869,8 +869,8 @@
       iterator = cursor.iterator();
       while ((msgCursor = iterator.next()) != null)
       {
-         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msgCursor.a);
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor);
       }
 
       forceGC();
@@ -902,29 +902,29 @@
 
       PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
 
-      Iterator<Pair<PagePosition, PagedMessage>> iter = cursor.iterator();
+      Iterator<PagedReferenceImpl> iter = cursor.iterator();
       
-      Iterator<Pair<PagePosition, PagedMessage>> iter2 = cursor.iterator();
+      Iterator<PagedReferenceImpl> iter2 = cursor.iterator();
       
       assertTrue(iter.hasNext());
       
-      Pair<PagePosition, PagedMessage> msg1 = iter.next();
+      PagedReferenceImpl msg1 = iter.next();
       
-      Pair<PagePosition, PagedMessage> msg2 = iter2.next();
+      PagedReferenceImpl msg2 = iter2.next();
       
-      assertEquals(tstProperty(msg1.b.getMessage()), tstProperty(msg2.b.getMessage()));
+      assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
       
-      System.out.println("property = " + tstProperty(msg1.b.getMessage()));
+      System.out.println("property = " + tstProperty(msg1.getMessage()));
 
       msg1 = iter.next();
       
-      assertEquals(1, tstProperty(msg1.b.getMessage()));
+      assertEquals(1, tstProperty(msg1.getMessage()));
       
       iter.remove();
       
       msg2 = iter2.next();
       
-      assertEquals(2, tstProperty(msg2.b.getMessage()));
+      assertEquals(2, tstProperty(msg2.getMessage()));
       
       assertTrue(iter2.hasNext());
       



More information about the hornetq-commits mailing list