[hornetq-commits] JBoss hornetq SVN: r10185 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 7 17:10:54 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-07 17:10:53 -0500 (Mon, 07 Feb 2011)
New Revision: 10185

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
Log:
Fixing tests on paging and transactions

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -29,6 +29,8 @@
    boolean isCommit();
 
    boolean isRollback();
+   
+   void setCommitted(boolean committed);
 
    void commit();
 
@@ -44,6 +46,8 @@
    
    void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
    
+   void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception;
+   
    void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
 
    // To be used after the update was stored or reload
@@ -63,4 +67,5 @@
     * @return true if the message will be delivered later, false if it should be delivered right away
     */
    boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -87,9 +87,12 @@
 
    /**
     * To be called when the cursor decided to ignore a position.
+    * 
     * @param position
     */
    void positionIgnored(PagePosition position);
+   
+   void lateDeliveryRollback(PagePosition position);
 
    /**
     * To be used to avoid a redelivery of a prepared ACK after load

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -15,7 +15,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -73,7 +73,7 @@
 
    private static void trace(final String message)
    {
-	      PageSubscriptionImpl.log.trace(message);
+      PageSubscriptionImpl.log.trace(message);
    }
 
    private volatile boolean autoCleanup = true;
@@ -99,9 +99,9 @@
    private List<PagePosition> recoveredACK;
 
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
-   
+
    private final PageSubscriptionCounter counter;
-   
+
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
@@ -178,8 +178,6 @@
       confirmPosition(position);
    }
 
-   
-   
    public long getMessageCount()
    {
       return counter.getValue() - deliveredCount.get();
@@ -189,7 +187,7 @@
    {
       return counter;
    }
-   
+
    public void scheduleCleanupCheck()
    {
       if (autoCleanup)
@@ -278,7 +276,7 @@
                   synchronized (PageSubscriptionImpl.this)
                   {
                      for (PageCursorInfo completePage : completedPages)
-                     {  
+                     {
                         if (isTrace)
                         {
                            PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
@@ -423,7 +421,7 @@
    public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
    {
       confirmPosition(tx, reference.getPosition());
-      
+
       counter.increment(tx, -1);
 
       PageTransactionInfo txInfo = getPageTransaction(reference);
@@ -480,7 +478,7 @@
          return consumedPages.firstKey();
       }
    }
-   
+
    public void addPendingDelivery(final PagePosition position)
    {
       getPageInfo(position).incrementPendingTX();
@@ -551,6 +549,13 @@
       processACK(position);
    }
 
+   
+   public void lateDeliveryRollback(PagePosition position)
+   {
+      PageCursorInfo cursorInfo = processACK(position);
+      cursorInfo.decrementPendingTX();
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
     */
@@ -753,7 +758,7 @@
 
    // To be called only after the ACK has been processed and guaranteed to be on storae
    // The only exception is on non storage events such as not matching messages
-   private void processACK(final PagePosition pos)
+   private PageCursorInfo processACK(final PagePosition pos)
    {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
       {
@@ -770,6 +775,8 @@
       PageCursorInfo info = getPageInfo(pos);
 
       info.addACK(pos);
+      
+      return info;
    }
 
    /**
@@ -783,7 +790,7 @@
          // It needs to persist, otherwise the cursor will return to the fist page position
          tx.setContainsPersistent();
       }
-      
+
       getPageInfo(position).remove(position);
 
       PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -846,7 +853,7 @@
 
       // The page was live at the time of the creation
       private final boolean wasLive;
-      
+
       // There's a pending TX to add elements on this page
       private AtomicInteger pendingTX = new AtomicInteger(0);
 
@@ -902,12 +909,12 @@
       {
          return pageId;
       }
-      
+
       public void incrementPendingTX()
       {
          pendingTX.incrementAndGet();
       }
-      
+
       public void decrementPendingTX()
       {
          pendingTX.decrementAndGet();
@@ -932,12 +939,13 @@
          if (isTrace)
          {
             PageSubscriptionImpl.trace("numberOfMessages =  " + getNumberOfMessages() +
-                                       " confirmed =  " +
-                                       (confirmed.get() + 1) +
-                                       ", page = " +
-                                       pageId);
+                    " confirmed =  " +
+                    (confirmed.get() + 1) +
+                    " pendingTX = " + pendingTX +
+                    ", page = " +
+                    pageId);
          }
-         
+
          // Negative could mean a bookmark on the first element for the page (example -1)
          if (posACK.getMessageNr() >= 0)
          {
@@ -1018,7 +1026,7 @@
 
          }
       }
-      
+
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
        */
@@ -1026,7 +1034,6 @@
       {
          return Collections.emptyList();
       }
-      
 
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -172,6 +172,25 @@
     */
    public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
    {
+      internalUpdatePageManager(storageManager, pagingManager, tx, 1);
+   }
+   
+   public void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception
+   {
+      UpdatePageTXOperation updt = internalUpdatePageManager(storageManager, pagingManager, tx, increment);
+      updt.setStored();
+   }
+
+   /**
+    * @param storageManager
+    * @param pagingManager
+    * @param tx
+    */
+   protected UpdatePageTXOperation internalUpdatePageManager(final StorageManager storageManager,
+                                            final PagingManager pagingManager,
+                                            final Transaction tx,
+                                            final int increment)
+   {
       UpdatePageTXOperation pgtxUpdate = (UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
       
       if (pgtxUpdate == null)
@@ -183,7 +202,9 @@
       
       tx.setContainsPersistent();
       
-      pgtxUpdate.addUpdate(this);
+      pgtxUpdate.addUpdate(this, increment);
+      
+      return pgtxUpdate;
    }
    
    public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
@@ -213,6 +234,11 @@
    {
       return committed;
    }
+   
+   public void setCommitted(final boolean committed)
+   {
+      this.committed = committed;
+   }
 
    public boolean isRollback()
    {
@@ -228,8 +254,9 @@
       {
          for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
          {
-            pos.a.positionIgnored(pos.b);
+            pos.a.lateDeliveryRollback(pos.b);
          }
+         lateDeliveries = null;
       }
    }
 
@@ -303,8 +330,13 @@
          this.pagingManager = pagingManager;
       }
       
-      public void addUpdate(PageTransactionInfo info)
+      public void setStored()
       {
+         stored = true;
+      }
+      
+      public void addUpdate(final PageTransactionInfo info, final int increment)
+      {
          AtomicInteger counter = countsToUpdate.get(info);
          
          if (counter == null)
@@ -313,7 +345,7 @@
             countsToUpdate.put(info, counter);
          }
          
-         counter.incrementAndGet();
+         counter.addAndGet(increment);
       }
       
       public void beforePrepare(Transaction tx) throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -1664,16 +1664,27 @@
                }
                case PAGE_TRANSACTION:
                {
+                  
                   PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
 
                   pageTransactionInfo.decode(buff);
 
-                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+                  if (record.isUpdate)
+                  {
+                     PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
+                     pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
+                  }
+                  else
+                  {
+                     pageTransactionInfo.setCommitted(false);
+   
+                     tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+   
+                     pagingManager.addTransaction(pageTransactionInfo);
+   
+                     tx.addOperation(new FinishPageMessageOperation());
+                  }
 
-                  pagingManager.addTransaction(pageTransactionInfo);
-
-                  tx.addOperation(new FinishPageMessageOperation());
-
                   break;
                }
                case SET_SCHEDULED_DELIVERY_TIME:

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -309,8 +309,6 @@
 
          doRollback();
 
-         state = State.ROLLEDBACK;
-
          // We use the Callback even for non persistence
          // If we are using non-persistence with replication, the replication manager will have
          // to execute this runnable in the correct order
@@ -327,6 +325,7 @@
             public void done()
             {
                afterRollback();
+               state = State.ROLLEDBACK;
             }
          });
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -154,20 +154,20 @@
       return (Iterator[])Array.newInstance(Iterator.class, size);
    }
 
-   private void removeAfter(Node<E> after)
+   private void removeAfter(Node<E> node)
    {
-      Node<E> toRemove = after.next;
+      Node<E> toRemove = node.next;
 
-      after.next = toRemove.next;
+      node.next = toRemove.next;
 
       if (toRemove.next != null)
       {
-         toRemove.next.prev = after;
+         toRemove.next.prev = node;
       }
 
       if (toRemove == tail)
       {
-         tail = after;
+         tail = node;
       }
 
       size--;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java	2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java	2011-02-07 22:10:53 UTC (rev 10185)
@@ -1209,8 +1209,8 @@
       clientSession.rollback(xid);
       clientSession.start();
       m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
       m.acknowledge();
-      Assert.assertNotNull(m);
       Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
       m = clientConsumer.receive(1000);
       Assert.assertNotNull(m);



More information about the hornetq-commits mailing list