[hornetq-commits] JBoss hornetq SVN: r9407 - in trunk: src/main/org/hornetq/core/server/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jul 19 05:06:53 EDT 2010


Author: jmesnil
Date: 2010-07-19 05:06:52 -0400 (Mon, 19 Jul 2010)
New Revision: 9407

Modified:
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/transaction/Transaction.java
   trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-442: Out of Order delivery with depaging during a transaction

* ensure that during a tx completion, message routed to a an address which is depaging keep the transactional context
  and remain in order

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -14,8 +14,6 @@
 package org.hornetq.core.postoffice.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -60,9 +58,9 @@
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUIDGenerator;
@@ -1104,20 +1102,14 @@
    private class PageMessageOperation implements TransactionOperation
    {
       private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
-
+      
+      private Transaction subTX = null;
+      
       void addMessageToPage(final ServerMessage message)
       {
          messagesToPage.add(message);
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
-       */
-      public Collection<Queue> getDistinctQueues()
-      {
-         return Collections.emptySet();
-      }
-
       public void afterCommit(final Transaction tx)
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1130,10 +1122,19 @@
          {
             pageTransaction.commit();
          }
+         
+         if (subTX != null)
+         {
+            subTX.afterCommit();
+         }
       }
 
       public void afterPrepare(final Transaction tx)
       {
+         if (subTX != null)
+         {
+            subTX.afterPrepare();
+         }
       }
 
       public void afterRollback(final Transaction tx)
@@ -1144,6 +1145,11 @@
          {
             pageTransaction.rollback();
          }
+
+         if (subTX != null)
+         {
+            subTX.afterRollback();
+         }
       }
 
       public void beforeCommit(final Transaction tx) throws Exception
@@ -1152,15 +1158,30 @@
          {
             pageMessages(tx);
          }
+         
+         if (subTX != null)
+         {
+            subTX.beforeCommit();
+         }
+         
       }
 
       public void beforePrepare(final Transaction tx) throws Exception
       {
          pageMessages(tx);
+         
+         if (subTX != null)
+         {
+            subTX.beforePrepare();
+         }
       }
 
       public void beforeRollback(final Transaction tx) throws Exception
       {
+         if (subTX != null)
+         {
+            subTX.beforeRollback();
+         }
       }
 
       private void pageMessages(final Transaction tx) throws Exception
@@ -1201,9 +1222,13 @@
                else
                {
                   // This could happen when the PageStore left the pageState
-
-                  // TODO is this correct - don't we lose transactionality here???
-                  route(message, false);
+                  // we create a copy of the transaction so that messages are routed with the same tx ID.
+                  // but we can not use directly the tx as it has already its own set of TransactionOperations
+                  if (subTX == null)
+                  {
+                     subTX = tx.copy();
+                  }
+                  route(message, subTX, false);
                }
                first = false;
             }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -1385,7 +1385,7 @@
       return status;
    }
 
-   private void postAcknowledge(final MessageReference ref) throws Exception
+   private void postAcknowledge(final MessageReference ref)
    {
       final ServerMessage message = ref.getMessage();
 
@@ -1423,7 +1423,14 @@
 
       queue.deliveringCount.decrementAndGet();
 
-      message.decrementRefCount();
+      try
+      {
+         message.decrementRefCount();
+      }
+      catch (Exception e)
+      {
+         QueueImpl.log.warn("Unable to decrement reference counting", e);
+      }
    }
 
    void postRollback(final LinkedList<MessageReference> refs)
@@ -1500,7 +1507,7 @@
          }
       }
 
-      public void afterCommit(final Transaction tx) throws Exception
+      public void afterCommit(final Transaction tx)
       {
          for (MessageReference ref : refsToAck)
          {

Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -22,7 +22,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
- */
+ */   
 public interface Transaction
 {
    void prepare() throws Exception;
@@ -32,6 +32,10 @@
    void commit(boolean onePhase) throws Exception;
 
    void rollback() throws Exception;
+   
+   /** Used for pages during commit.
+    *  When paging messages we need to guarantee that they are in the same transaction (but not with the same set of TransactionOperation). */
+   Transaction copy();
 
    int getOperationsCount();
 
@@ -64,9 +68,24 @@
    void setContainsPersistent();
 
    void setTimeout(int timeout);
+   
+   // To be used by sub-contexts. Mainly on paging
+   
+   void beforeCommit() throws Exception;
+   
+   void beforeRollback() throws Exception;
+   
+   void beforePrepare() throws Exception;;
+   
+   void afterPrepare();
+   
+   void afterCommit();
+   
+   void afterRollback();
 
    static enum State
    {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
    }
+
 }

Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -24,13 +24,16 @@
 {
    void beforePrepare(Transaction tx) throws Exception;
 
+   /** After prepare shouldn't throw any exception. Any verification has to be done on before prepare */
+   void afterPrepare(Transaction tx);
+
    void beforeCommit(Transaction tx) throws Exception;
 
+   /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
+   void afterCommit(Transaction tx);
+
    void beforeRollback(Transaction tx) throws Exception;
 
-   void afterPrepare(Transaction tx) throws Exception;
-
-   void afterCommit(Transaction tx) throws Exception;
-
-   void afterRollback(Transaction tx) throws Exception;
+   /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
+   void afterRollback(Transaction tx);
 }

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -73,6 +73,20 @@
       this.timeoutSeconds = timeoutSeconds;
    }
 
+   /** Used for copying */
+   private TransactionImpl(final TransactionImpl other)
+   {
+      this.storageManager = other.storageManager;
+
+      this.xid = other.xid;
+
+      this.id = other.id;
+
+      this.createTime = other.createTime;
+
+      this.timeoutSeconds = other.timeoutSeconds;
+   }
+
    public TransactionImpl(final StorageManager storageManager)
    {
       this.storageManager = storageManager;
@@ -130,7 +144,7 @@
    {
       return createTime;
    }
-
+   
    public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
    {
       if(timeoutSeconds == - 1)
@@ -169,13 +183,7 @@
             throw new IllegalStateException("Cannot prepare non XA transaction");
          }
 
-         if (operations != null)
-         {
-            for (TransactionOperation operation : operations)
-            {
-               operation.beforePrepare(this);
-            }
-         }
+         beforePrepare();
 
          storageManager.prepare(id, xid);
 
@@ -195,22 +203,7 @@
 
             public void done()
             {
-               if (operations != null)
-               {
-                  for (TransactionOperation operation : operations)
-                  {
-                     try
-                     {
-                        operation.afterPrepare(TransactionImpl.this);
-                     }
-                     catch (Exception e)
-                     {
-                        // https://jira.jboss.org/jira/browse/HORNETQ-188
-                        // After commit shouldn't throw an exception
-                        TransactionImpl.log.warn(e.getMessage(), e);
-                     }
-                  }
-               }
+               afterPrepare();
             }
          });
       }
@@ -252,15 +245,9 @@
                throw new IllegalStateException("Transaction is in invalid state " + state);
             }
          }
+         
+         beforeCommit();
 
-         if (operations != null)
-         {
-            for (TransactionOperation operation : operations)
-            {
-               operation.beforeCommit(this);
-            }
-         }
-
          if (containsPersistent || xid != null && state == State.PREPARED)
          {
             storageManager.commit(id);
@@ -285,22 +272,7 @@
 
             public void done()
             {
-               if (operations != null)
-               {
-                  for (TransactionOperation operation : operations)
-                  {
-                     try
-                     {
-                        operation.afterCommit(TransactionImpl.this);
-                     }
-                     catch (Exception e)
-                     {
-                        // https://jira.jboss.org/jira/browse/HORNETQ-188
-                        // After commit shouldn't throw an exception
-                        TransactionImpl.log.warn(e.getMessage(), e);
-                     }
-                  }
-               }
+               afterCommit();
             }
          });
 
@@ -326,13 +298,7 @@
             }
          }
 
-         if (operations != null)
-         {
-            for (TransactionOperation operation : operations)
-            {
-               operation.beforeRollback(this);
-            }
-         }
+         beforeRollback();
 
          doRollback();
 
@@ -353,22 +319,7 @@
 
             public void done()
             {
-               if (operations != null)
-               {
-                  for (TransactionOperation operation : operations)
-                  {
-                     try
-                     {
-                        operation.afterRollback(TransactionImpl.this);
-                     }
-                     catch (Exception e)
-                     {
-                        // https://jira.jboss.org/jira/browse/HORNETQ-188
-                        // After commit shouldn't throw an exception
-                        TransactionImpl.log.warn(e.getMessage(), e);
-                     }
-                  }
-               }
+               afterRollback();
             }
          });
       }
@@ -471,4 +422,75 @@
       }
    }
 
+   public Transaction copy()
+   {
+      return new TransactionImpl(this);
+   }
+
+   public void afterCommit()
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.afterCommit(this);
+         }
+      }
+   }
+
+   public void afterRollback()
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.afterRollback(this);
+         }
+      }
+   }
+
+   public void beforeCommit() throws Exception
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.beforeCommit(this);
+         }
+      }
+   }
+
+   public void beforePrepare() throws Exception
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.beforePrepare(this);
+         }
+      }
+   }
+
+   public void beforeRollback() throws Exception
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.beforeRollback(this);
+         }
+      }
+   }
+
+   public void afterPrepare()
+   {
+      if (operations != null)
+      {
+         for (TransactionOperation operation : operations)
+         {
+            operation.afterPrepare(this);
+         }
+      }
+   }
+
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -247,7 +247,7 @@
 
       server.start();
 
-      final int numberOfIntegers = 256;
+      final int messageSize = 1024; // 1k
 
       try
       {
@@ -263,7 +263,7 @@
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
-         byte[] body = new byte[DataConstants.SIZE_INT * numberOfIntegers];
+         byte[] body = new byte[messageSize];
          // HornetQBuffer bodyLocal = HornetQChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
 
          ClientMessage message = null;
@@ -372,7 +372,7 @@
     *  Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
     * 
     */
-   public void disabled_testDepageDuringTransaction2() throws Exception
+   public void testDepageDuringTransaction2() throws Exception
    {
       boolean IS_DURABLE_MESSAGE = true;
       clearData();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-07-19 09:06:52 UTC (rev 9407)
@@ -292,6 +292,35 @@
          
       }
 
+      public Transaction copy()
+      {
+         return null;
+      }
+
+      public void afterCommit()
+      {
+      }
+
+      public void afterPrepare()
+      {
+      }
+
+      public void afterRollback()
+      {
+      }
+
+      public void beforeCommit() throws Exception
+      {
+      }
+
+      public void beforePrepare() throws Exception
+      {
+      }
+
+      public void beforeRollback() throws Exception
+      {
+      }
+
    }
 
    class FakeMessage implements ServerMessage



More information about the hornetq-commits mailing list