[hornetq-commits] JBoss hornetq SVN: r10045 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 14:47:44 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-16 14:47:43 -0500 (Thu, 16 Dec 2010)
New Revision: 10045

Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
Page Counters commit 2

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-16 19:47:43 UTC (rev 10045)
@@ -25,20 +25,25 @@
 public interface PageSubscriptionCounter
 {
 
-   public abstract long getValue();
+   long getValue();
 
-   public abstract void increment(Transaction tx, int add) throws Exception;
+   void increment(Transaction tx, int add) throws Exception;
 
-   public abstract void loadValue(final long recordValueID, final long value);
+   void loadValue(final long recordValueID, final long value);
+   
+   void loadInc(final long recordInd, final int add);
+   
+   void replayIncrement(Transaction tx, long recordID, int add);
+   
+   /** This will process the reload */
+   void processReload();
 
-   public abstract void incrementProcessed(long id, int variance);
-
    /**
     * 
     * This method is also used by Journal.loadMessageJournal
     * @param id
     * @param variance
     */
-   public abstract void addInc(long id, int variance);
+   void addInc(long id, int variance);
 
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 19:47:43 UTC (rev 10045)
@@ -19,6 +19,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.persistence.StorageManager;
@@ -58,6 +59,8 @@
    private final AtomicLong value = new AtomicLong(0);
 
    private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+   
+   private LinkedList<Pair<Long, Integer>> loadList;
 
    private final Executor executor;
    
@@ -96,6 +99,22 @@
     */
    public void increment(Transaction tx, int add) throws Exception
    {
+      tx.setContainsPersistent();
+      
+      long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+      replayIncrement(tx, id, add);
+
+   }
+
+   /**
+    * This method will install the prepared TXs
+    * @param tx
+    * @param recordID
+    * @param add
+    */
+   public void replayIncrement(Transaction tx, long recordID, int add)
+   {
       CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
 
       if (oper == null)
@@ -105,18 +124,16 @@
          tx.addOperation(oper);
       }
 
-      long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
-      oper.operations.add(new ItemOper(this, id, add));
-
+      oper.operations.add(new ItemOper(this, recordID, add));
    }
    
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
     */
-   public synchronized void loadValue(final long recordValueID, final long value)
+   public synchronized void loadValue(final long recordID, final long value)
    {
       this.value.set(value);
+      this.recordID = recordID;
    }
    
    
@@ -124,9 +141,9 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
     */
-   public synchronized void incrementProcessed(long id, int variance)
+   public synchronized void incrementProcessed(long id, int add)
    {
-      addInc(id, variance);
+      addInc(id, add);
       if (incrementRecords.size() > FLUSH_COUNTER)
       {
          executor.execute(cleanupCheck);
@@ -135,6 +152,33 @@
    }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
+    */
+   public void loadInc(long id, int add)
+   {
+      if (loadList == null)
+      {
+         loadList = new LinkedList<Pair<Long,Integer>>();
+      }
+      
+      loadList.add(new Pair<Long, Integer>(id, add));
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#processReload()
+    */
+   public void processReload()
+   {
+      for (Pair<Long, Integer> incElement : loadList)
+      {
+         value.addAndGet(incElement.b);
+         incrementRecords.add(incElement.a);
+      }
+      loadList.clear();
+      loadList = null;
+   }
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
     */
    public void addInc(long id, int variance)
@@ -211,14 +255,14 @@
    static class ItemOper
    {
 
-      public ItemOper(PageSubscriptionCounter counter, long id, int add)
+      public ItemOper(PageSubscriptionCounterImpl counter, long id, int add)
       {
          this.counter = counter;
          this.id = id;
          this.ammount = add;
       }
 
-      PageSubscriptionCounter counter;
+      PageSubscriptionCounterImpl counter;
 
       long id;
 

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 19:47:43 UTC (rev 10045)
@@ -52,8 +52,10 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.GroupingInfo;
@@ -138,9 +140,9 @@
    public static final byte HEURISTIC_COMPLETION = 38;
 
    public static final byte ACKNOWLEDGE_CURSOR = 39;
-   
+
    public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
-   
+
    public static final byte PAGE_CURSOR_COUNTER_INC = 41;
 
    private UUID persistentID;
@@ -278,7 +280,7 @@
       }
       else
       {
-         idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);      
+         idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
       }
       Journal localMessage = new JournalImpl(config.getJournalFileSize(),
                                              config.getJournalMinFiles(),
@@ -440,7 +442,7 @@
       }
 
       LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
-      
+
       largeMessage.copyHeadersAndProperties(message);
 
       largeMessage.setMessageID(id);
@@ -496,16 +498,18 @@
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
    }
-   
+
    public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
    {
-     long ackID = idGenerator.generateID();
-     position.setRecordID(ackID);
-     messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+      long ackID = idGenerator.generateID();
+      position.setRecordID(ackID);
+      messageJournal.appendAddRecord(ackID,
+                                     ACKNOWLEDGE_CURSOR,
+                                     new CursorAckRecordEncoding(queueID, position),
+                                     syncNonTransactional,
+                                     getContext(syncNonTransactional));
    }
 
-
-
    public void deleteMessage(final long messageID) throws Exception
    {
       // Messages are deleted on postACK, one after another.
@@ -592,8 +596,7 @@
    {
       messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
                                         JournalStorageManager.PAGE_TRANSACTION,
-                                        new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
-                                                                 depages),
+                                        new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
    }
@@ -621,9 +624,11 @@
    {
       long ackID = idGenerator.generateID();
       position.setRecordID(ackID);
-      messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
+      messageJournal.appendAddRecordTransactional(txID,
+                                                  ackID,
+                                                  ACKNOWLEDGE_CURSOR,
+                                                  new CursorAckRecordEncoding(queueID, position));
    }
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
@@ -633,7 +638,6 @@
       messageJournal.appendDeleteRecordTransactional(txID, ackID);
    }
 
-
    public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
       long id = generateUniqueID();
@@ -807,6 +811,8 @@
 
       Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
 
+      Map<Long, PageSubscription> pageSubscriptions = new HashMap<Long, PageSubscription>();
+
       final int totalSize = records.size();
 
       for (int reccount = 0; reccount < totalSize; reccount++)
@@ -1011,25 +1017,62 @@
             {
                CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
                encoding.decode(buff);
-               
+
                encoding.position.setRecordID(record.id);
-               
-               QueueBindingInfo queueInfo = queueInfos.get(encoding.queueID);
-               
-               if (queueInfo != null)
+
+               PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+               if (sub != null)
                {
-                  SimpleString address = queueInfo.getAddress();
-                  PagingStore store = pagingManager.getPageStore(address);
-                  PageSubscription cursor = store.getCursorProvier().getSubscription(encoding.queueID);
-                  cursor.reloadACK(encoding.position);
+                  sub.reloadACK(encoding.position);
                }
                else
                {
                   log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
                }
-               
+
                break;
             }
+            case PAGE_CURSOR_COUNTER_VALUE:
+            {
+               PageCountRecord encoding = new PageCountRecord();
+
+               encoding.decode(buff);
+
+               PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+               if (sub != null)
+               {
+                  sub.getCounter().loadValue(record.id, encoding.value);
+               }
+               else
+               {
+                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+               }
+
+               break;
+            }
+
+            case PAGE_CURSOR_COUNTER_INC:
+            {
+               PageCountRecordInc encoding = new PageCountRecordInc();
+
+               encoding.decode(buff);
+
+
+               PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+               if (sub != null)
+               {
+                  sub.getCounter().loadValue(record.id, encoding.value);
+               }
+               else
+               {
+                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+               }
+
+               break;
+            }
             default:
             {
                throw new IllegalStateException("Invalid record type " + recordType);
@@ -1083,7 +1126,7 @@
          }
       }
 
-      loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+      loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
 
       for (LargeServerMessage msg : largeMessages)
       {
@@ -1110,7 +1153,7 @@
             }
          }
       }
-      
+
       // To recover positions on Iterators
       if (pagingManager != null)
       {
@@ -1132,6 +1175,35 @@
       return info;
    }
 
+   /**
+    * @param queueID
+    * @param pageSubscriptions
+    * @param queueInfos
+    * @return
+    */
+   private PageSubscription locateSubscription(final long queueID,
+                                               final Map<Long, PageSubscription> pageSubscriptions,
+                                               final Map<Long, QueueBindingInfo> queueInfos,
+                                               final PagingManager pagingManager) throws Exception
+   {
+
+      PageSubscription subs = pageSubscriptions.get(queueID);
+      if (subs == null)
+      {
+         QueueBindingInfo queueInfo = queueInfos.get(queueID);
+
+         if (queueInfo != null)
+         {
+            SimpleString address = queueInfo.getAddress();
+            PagingStore store = pagingManager.getPageStore(address);
+            subs = store.getCursorProvier().getSubscription(queueID);
+            pageSubscriptions.put(queueID, subs);
+         }
+      }
+      
+      return subs;
+   }
+
    // grouping handler operations
    public void addGrouping(final GroupBinding groupBinding) throws Exception
    {
@@ -1170,8 +1242,6 @@
    {
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
-   
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
@@ -1179,18 +1249,23 @@
    public long storePageCounterInc(long txID, long queueID, int value) throws Exception
    {
       long recordID = idGenerator.generateID();
-      messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+      messageJournal.appendAddRecordTransactional(txID,
+                                                  recordID,
+                                                  JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+                                                  new PageCountRecordInc(queueID, value));
       return recordID;
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
     */
    public long storePageCounter(long txID, long queueID, long value) throws Exception
    {
       long recordID = idGenerator.generateID();
-      messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+      messageJournal.appendAddRecordTransactional(txID,
+                                                  recordID,
+                                                  JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+                                                  new PageCountRecord(queueID, value));
       return recordID;
    }
 
@@ -1209,9 +1284,7 @@
    {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
-   
-   
-   
+
    public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
                                                     final List<GroupingInfo> groupingInfos) throws Exception
    {
@@ -1458,8 +1531,10 @@
                                          final PagingManager pagingManager,
                                          final ResourceManager resourceManager,
                                          final Map<Long, Queue> queues,
+                                         final Map<Long, QueueBindingInfo> queueInfos,
                                          final List<PreparedTransactionInfo> preparedTransactions,
-                                         final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                         final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                         final Map<Long, PageSubscription> pageSubscriptions) throws Exception
    {
       // recover prepared transactions
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1609,6 +1684,34 @@
                   // and make sure the rollback will work well also
                   break;
                }
+               case PAGE_CURSOR_COUNTER_VALUE:
+               {
+                  log.warn("PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, what shouldn't happen");
+
+                  break;
+               }
+
+               case PAGE_CURSOR_COUNTER_INC:
+               {
+                  PageCountRecordInc encoding = new PageCountRecordInc();
+
+                  encoding.decode(buff);
+
+
+                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+                  if (sub != null)
+                  {
+                     sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+                  }
+                  else
+                  {
+                     log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  }
+
+                  break;
+               }
+
                default:
                {
                   JournalStorageManager.log.warn("InternalError: Record type " + recordType +
@@ -2297,23 +2400,23 @@
       }
 
    }
-   
+
    private static final class PageCountRecord implements EncodingSupport
    {
-      
+
       PageCountRecord()
       {
-         
+
       }
-      
+
       PageCountRecord(long queueID, long value)
       {
          this.queueID = queueID;
          this.value = value;
       }
-      
+
       long queueID;
-      
+
       long value;
 
       /* (non-Javadoc)
@@ -2341,10 +2444,55 @@
          queueID = buffer.readLong();
          value = buffer.readLong();
       }
-      
-      
+
    }
 
+   private static final class PageCountRecordInc implements EncodingSupport
+   {
+
+      PageCountRecordInc()
+      {
+
+      }
+
+      PageCountRecordInc(long queueID, int value)
+      {
+         this.queueID = queueID;
+         this.value = value;
+      }
+
+      long queueID;
+
+      int value;
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeLong(queueID);
+         buffer.writeInt(value);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void decode(HornetQBuffer buffer)
+      {
+         queueID = buffer.readLong();
+         value = buffer.readInt();
+      }
+
+   }
+
    private static final class AddMessageRecord
    {
       public AddMessageRecord(final ServerMessage message)
@@ -2366,7 +2514,7 @@
          this.queueID = queueID;
          this.position = position;
       }
-      
+
       public CursorAckRecordEncoding()
       {
          this.position = new PagePositionImpl();

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 19:47:43 UTC (rev 10045)
@@ -119,7 +119,7 @@
       
       counter = locateCounter(queue);
       
-      //assertEquals(1, counter.getValue());
+      assertEquals(1, counter.getValue());
 
    }
 



More information about the hornetq-commits mailing list