[hornetq-commits] JBoss hornetq SVN: r9744 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 4 15:35:39 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-04 15:35:39 -0400 (Mon, 04 Oct 2010)
New Revision: 9744

Removed:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Storage of cursors' ack initial implementation

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -32,5 +32,7 @@
 
    void ack(PagePosition position);
 
+   void ack(long tx, PagePosition position);
+
    void returnElement(PagePosition position);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -41,6 +41,17 @@
 
    PagingStore getAssociatedStore();
 
+   /**
+    * 
+    * @param queueId The cursorID should be the same as the queueId associated for persistance
+    * @return
+    */
+   PageCursor createCursor(long queueId);
+   
+   /**
+    * Create a non persistent cursor, usually associated with browsing
+    * @return
+    */
    PageCursor createCursor();
 
    // PageCursor recoverCursor(PagePosition position);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.paging.cursor;
 
+
 /**
  * A PagePosition
  *
@@ -23,9 +24,10 @@
 public interface PagePosition extends Comparable<PagePosition>
 {
 
+   // The recordID associated during ack
    long getRecordID();
 
-   // TODO: this belongs somewhere else
+   // The recordID associated during ack
    void setRecordID(long recordID);
 
    long getPageNr();

Deleted: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -1,28 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor;
-
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-
-/**
- * A StorageCursor
- *
- * @author clebertsuconic
- *
- *
- */
-public interface StorageCursor
-{
-   void storeCursorInitialPosition(PagePositionImpl position);
-}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -18,7 +18,7 @@
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.StorageCursor;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
 
 /**
@@ -35,10 +35,12 @@
 
    // Attributes ----------------------------------------------------
 
-   private StorageCursor store;
+   private StorageManager store;
 
+   private final long cursorId;
+
    private PagingStore pageStore;
-   
+
    private final PageCursorProvider cursorProvider;
 
    private volatile PagePosition lastPosition;
@@ -47,11 +49,12 @@
 
    // Constructors --------------------------------------------------
 
-   public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageCursor store)
+   public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageManager store, long cursorId)
    {
       this.pageStore = pageStore;
       this.store = store;
       this.cursorProvider = cursorProvider;
+      this.cursorId = cursorId;
    }
 
    // Public --------------------------------------------------------
@@ -65,15 +68,15 @@
       {
          lastPosition = recoverLastPosition();
       }
-       
-      Pair<PagePosition,ServerMessage> message = null;
+
+      Pair<PagePosition, ServerMessage> message = null;
       do
       {
-        message = cursorProvider.getAfter(lastPosition);
-        if (message != null)
-        {
-           lastPosition = message.a;
-        }
+         message = cursorProvider.getAfter(lastPosition);
+         if (message != null)
+         {
+            lastPosition = message.a;
+         }
       }
       while (message != null && !match(message.b));
 
@@ -86,8 +89,14 @@
    public void ack(PagePosition position)
    {
       // TODO Auto-generated method stub
-
    }
+   
+   public void ack(long tx, PagePosition position)
+   {
+      
+   }
+   
+   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -110,7 +119,7 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected boolean match(ServerMessage message)
    {
       return true;
@@ -123,7 +132,7 @@
       long firstPage = pageStore.getFirstPage();
       return new PagePositionImpl(firstPage, -1);
    }
-   
+
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -66,9 +66,15 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
+   public PageCursor createCursor(long cursorId)
+   {
+      return new PageCursorImpl(this, pagingStore, storageManager, cursorId);
+   }
+   
+   
    public PageCursor createCursor()
    {
-      return new PageCursorImpl(this, pagingStore, null);
+      return new PageCursorImpl(this, pagingStore, storageManager, 0);
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,7 +13,9 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.utils.DataConstants;
 
 /**
  * A PagePosition
@@ -43,6 +45,15 @@
    }
 
    /**
+    * @param pageNr
+    * @param messageNr
+    */
+   public PagePositionImpl()
+   {
+
+   }
+
+   /**
     * @return the recordID
     */
    public long getRecordID()
@@ -115,5 +126,4 @@
    {
       return this.pageNr == pos.getPageNr() && this.getRecordID() - pos.getRecordID() == 1;
    }
-
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.persistence;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
 import org.hornetq.core.postoffice.Binding;
@@ -98,6 +98,8 @@
    void deleteMessage(long messageID) throws Exception;
 
    void storeAcknowledge(long queueID, long messageID) throws Exception;
+   
+   void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
 
    void updateDeliveryCount(MessageReference ref) throws Exception;
 
@@ -113,6 +115,8 @@
 
    void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
 
+   void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
+
    void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
 
    void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -50,6 +50,8 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.OperationContext;
@@ -134,6 +136,8 @@
 
    public static final byte HEURISTIC_COMPLETION = 38;
 
+   public static final byte ACKNOWLEDGE_PAGING = 39;
+
    private UUID persistentID;
 
    private final BatchingIDGenerator idGenerator;
@@ -508,7 +512,16 @@
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
    }
+   
+   public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
+   {
+     long ackID = idGenerator.generateID();
+     position.setRecordID(ackID);
+     messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+   }
 
+
+
    public void deleteMessage(final long messageID) throws Exception
    {
       // Messages are deleted on postACK, one after another.
@@ -607,6 +620,16 @@
                                                      new RefEncoding(queueID));
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
+   {
+      long ackID = idGenerator.generateID();
+      position.setRecordID(ackID);
+      messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position));
+   }
+
    public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
       long id = generateUniqueID();
@@ -2192,6 +2215,7 @@
       }
 
    }
+   
 
    private static final class AddMessageRecord
    {
@@ -2209,6 +2233,50 @@
       boolean referenced = false;
    }
 
+
+
+   private static final class CursorAckRecordEncoding implements EncodingSupport
+   {
+      public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+      {
+         this.queueID = queueID;
+         this.position = position;
+      }
+
+      long queueID;
+
+      PagePosition position;
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeLong(queueID);
+         buffer.writeLong(position.getPageNr());
+         buffer.writeInt(position.getMessageNr());
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void decode(HornetQBuffer buffer)
+      {
+         queueID = buffer.readLong();
+         long pageNR = buffer.readLong();
+         int messageNR = buffer.readInt();
+         this.position = new PagePositionImpl(pageNR, messageNR);
+      }
+   }
+
    private class LargeMessageTXFailureCallback implements TransactionFailureCallback
    {
       private final Map<Long, ServerMessage> messages;
@@ -2245,5 +2313,4 @@
       }
 
    }
-
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -31,6 +31,7 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
@@ -450,4 +451,22 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long, org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void storeCursorAcknowledge(long queueID, PagePosition position)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-04 19:35:39 UTC (rev 9744)
@@ -46,6 +46,7 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -1501,6 +1502,24 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long, org.hornetq.core.paging.cursor.PagePosition)
+       */
+      public void storeCursorAcknowledge(long queueID, PagePosition position)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+       */
+      public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list