[hornetq-commits] JBoss hornetq SVN: r9792 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 15 16:07:40 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-15 16:07:39 -0400 (Fri, 15 Oct 2010)
New Revision: 9792

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on Cursors

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -14,6 +14,8 @@
 package org.hornetq.core.paging;
 
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 
@@ -48,4 +50,13 @@
    void increment();
 
    int getNumberOfMessages();
+
+   /**
+    * This method will hold the position to be delivered later in case this transaction is pending.
+    * If the tx is not pending, it will return false, so the caller can deliver it right away
+    * @param cursor
+    * @param cursorPos
+    * @return true if the message will be delivered later, false if it should be delivered right away
+    */
+   boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -65,6 +65,8 @@
 
    Page createPage(final int page) throws Exception;
    
+   PagingManager getPagingManager();
+   
    PageCursorProvider getCursorProvier();
    
    void processReload() throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -46,6 +46,12 @@
    void reloadACK(PagePosition position);
    
    /**
+    * To be called when the cursor decided to ignore a position.
+    * @param position
+    */
+   void positionIgnored(PagePosition position);
+   
+   /**
     * To be used to avoid a redelivery of a prepared ACK after load
     * @param position
     */

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -57,7 +57,7 @@
     */
    PageCursor createCursor();
 
-   Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos) throws Exception;
+   Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
    
    PagedMessage getMessage(PagePosition pos) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -133,7 +133,7 @@
 
       do
       {
-         message = cursorProvider.getAfter(this, lastPosition);
+         message = cursorProvider.getNext(this, lastPosition);
 
          if (message != null)
          {
@@ -217,10 +217,19 @@
     */
    public void reloadPreparedACK(final Transaction tx, final PagePosition position)
    {
-      // internalAdd(position);
       installTXCallback(tx, position);
    }
 
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void positionIgnored(PagePosition position)
+   {
+      processACK(position);
+   }
+
+   
    public void processReload() throws Exception
    {
       if (recoveredACK != null)
@@ -247,7 +256,7 @@
                   // looking for holes on the ack list for redelivery
                   while (true)
                   {
-                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getAfter(this, tmpPos);
+                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
 
                      positions = getPageInfo(tmpPos);
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -17,15 +17,17 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -44,9 +46,13 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
+
    // Attributes ----------------------------------------------------
 
    private final PagingStore pagingStore;
+   
+   private final PagingManager pagingManager;
 
    private final StorageManager storageManager;
 
@@ -65,6 +71,7 @@
                                  final ExecutorFactory executorFactory)
    {
       this.pagingStore = pagingStore;
+      this.pagingManager = pagingStore.getPagingManager();
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
    }
@@ -106,23 +113,47 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final PagePosition pos) throws Exception
+   public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor, PagePosition cursorPos) throws Exception
    {
 
       while(true)
       {
-         Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+         Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
          
-         
-         
-         return retPos;
+         if (retPos == null)
+         {
+            return null;
+         }
+         else
+         if (retPos != null)
+         {
+            cursorPos = retPos.a;
+            if (retPos.b.getTransactionID() != 0)
+            {
+               PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
+               if (tx == null)
+               {
+                  log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+                  cursor.positionIgnored(cursorPos);
+               }
+               else
+               {
+                  if (!tx.deliverAfterCommit(cursor, cursorPos))
+                  {
+                     return retPos;
+                  }
+               }
+            }
+            else
+            {
+               return retPos;
+            }
+         }
       }
    }
    
    private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
    {
-      // TODO: consider page transactions here to avoid receiving an uncommitted message
-      // TODO: consider the case where a full page is ignored because of a TX
       PagePosition retPos = pos.nextMessage();
 
       PageCache cache = getPageCache(pos);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -13,12 +13,17 @@
 
 package org.hornetq.core.paging.impl;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
@@ -46,6 +51,8 @@
    private volatile boolean rolledback = false;
 
    private AtomicInteger numberOfMessages = new AtomicInteger(0);
+   
+   private List<Pair<PageCursor, PagePosition>> lateDeliveries;
 
    // Static --------------------------------------------------------
 
@@ -132,6 +139,15 @@
    public synchronized void commit()
    {
       committed = true;
+      if (lateDeliveries != null)
+      {
+         for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+         {
+            pos.a.redeliver(pos.b);
+         }
+      }
+      lateDeliveries.clear();
+      lateDeliveries = null;
    }
 
    public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
@@ -203,6 +219,32 @@
              ")";
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.PageTransactionInfo#deliverAfterCommit(org.hornetq.core.paging.cursor.PageCursor, org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public synchronized boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos)
+   {
+      if (committed)
+      {
+         return false;
+      }
+      else
+      if (rolledback)
+      {
+         cursor.positionIgnored(cursorPos);
+         return true;
+      }
+      else
+      {
+         if (lateDeliveries == null)
+         {
+            lateDeliveries = new LinkedList<Pair<PageCursor, PagePosition>>();
+         }
+         lateDeliveries.add(new Pair<PageCursor, PagePosition>(cursor, cursorPos));
+         return true;
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -50,7 +50,7 @@
 
    private ServerMessage message;
 
-   private long transactionID = -1;
+   private long transactionID = 0;
 
    public PagedMessageImpl(final ServerMessage message, final long transactionID)
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -377,6 +377,10 @@
       cursorProvider.processReload();
    }
 
+   public PagingManager getPagingManager()
+   {
+      return pagingManager;
+   }
 
    // HornetQComponent implementation
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-15 20:07:39 UTC (rev 9792)
@@ -13,7 +13,9 @@
 
 package org.hornetq.tests.integration.paging;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -21,6 +23,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
@@ -29,7 +32,9 @@
 import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -237,6 +242,8 @@
       
       server.stop();
       
+      OperationContextImpl.clearContext();
+      
       server.start();
       
       cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -289,6 +296,8 @@
       
       server.stop();
       
+      OperationContextImpl.clearContext();
+      
       server.start();
       
       cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -359,17 +368,102 @@
    }
    
    
-   public void testRollbackScenariosOnACK() throws Exception
+   public void testPrepareScenarios() throws Exception
    {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 100;
       
+      final int messageSize = 10 * 1024;
+      
+      
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+      
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      System.out.println("Cursor: " + cursor);
+      
+      StorageManager storage = this.server.getStorageManager();
+      
+      PageTransactionInfoImpl pgtxRollback = new PageTransactionInfoImpl(storage.generateUniqueID());
+      PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
+      PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
+      
+      this.server.getPagingManager().addTransaction(pgtxRollback);
+      this.server.getPagingManager().addTransaction(pgtxCommit);
+      
+      pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+      pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+      pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+      
+      addMessages(300, NUM_MESSAGES, messageSize);
+
+
+      // First consume what's already there without any tx as nothing was committed
+      for (int i = 300; i < 400; i++)
+      {
+         Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos.a);
+      }
+
+      assertNull(cursor.moveNext());
+      
+      pgtxRollback.rollback();
+      pgtxCommit.commit();
+      // Second:after pgtxCommit was done
+      for (int i = 200; i < 300; i++)
+      {
+         Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+         assertNotNull(pos);
+         assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos.a);
+      }
+      
+      
    }
+
+
+   /**
+    * @param storage
+    * @param pageStore
+    * @param pgParameter
+    * @param start
+    * @param NUM_MESSAGES
+    * @param messageSize
+    * @throws Exception
+    */
+   private void pgMessages(StorageManager storage,
+                           PagingStoreImpl pageStore,
+                           PageTransactionInfo pgParameter,
+                           int start,
+                           final int NUM_MESSAGES,
+                           final int messageSize) throws Exception
+   {
+      List<ServerMessage> messages = new ArrayList<ServerMessage>();
+      
+      for (int i = start ; i < start + NUM_MESSAGES; i++)
+      {
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+         ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         messages.add(msg);
+      }
+      
+      pageStore.page(messages, pgParameter.getTransactionID());
+   }
    
-   public void testReadRolledBackData() throws Exception
+   public void testRollbackScenariosOnACK() throws Exception
    {
       
    }
    
-   public void testPrepareScenarios() throws Exception
+   public void testReadRolledBackData() throws Exception
    {
       
    }
@@ -398,19 +492,24 @@
    {
       
    }
+   
+   private int addMessages(final int numMessages, final int messageSize) throws Exception
+   {
+      return addMessages(0, numMessages, messageSize);
+   }
 
    /**
     * @param numMessages
     * @param pageStore
     * @throws Exception
     */
-   private int addMessages(final int numMessages, final int messageSize) throws Exception
+   private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
    {
       PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
 
       pageStore.startPaging();
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = start; i < start + numMessages; i++)
       {
          if (i % 100 == 0) System.out.println("Paged " + i);
          HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
@@ -464,7 +563,6 @@
 
    protected void tearDown() throws Exception
    {
-      OperationContextImpl.clearContext();
       server.stop();
       super.tearDown();
    }



More information about the hornetq-commits mailing list