[hornetq-commits] JBoss hornetq SVN: r8326 - in branches/ClebertTemporary/src/main/org/hornetq/core: server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 19 20:00:18 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-19 20:00:17 -0500 (Thu, 19 Nov 2009)
New Revision: 8326

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
Log:
Fixing tests

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-20 01:00:17 UTC (rev 8326)
@@ -2964,7 +2964,10 @@
             {
                // Set the delegated callback as a parameter
                TransactionCallback txcallback = tx.getCallback(currentFile);
-               txcallback.setDelegateCompletion(parameterCallback);
+               if (parameterCallback != null)
+               {
+                  txcallback.setDelegateCompletion(parameterCallback);
+               }
                callback = txcallback;
             }
             else

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-19 19:53:49 UTC (rev 8325)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-20 01:00:17 UTC (rev 8326)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -280,7 +281,7 @@
 
          bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
 
-         callbacks = new ArrayList<IOAsyncTask>();
+         callbacks = new LinkedList<IOAsyncTask>();
 
          active = false;
          pendingSync = false;

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-20 01:00:17 UTC (rev 8326)
@@ -627,6 +627,8 @@
       {
          acknowledge(ref);
       }
+      
+      storageManager.completeOperations();
    }
 
    public void setExpiryAddress(final SimpleString expiryAddress)
@@ -945,6 +947,7 @@
                   try
                   {
                      storageManager.updateDeliveryCount(reference);
+                     storageManager.completeOperations();
                   }
                   catch (Exception e)
                   {
@@ -974,6 +977,7 @@
                   try
                   {
                      sendToDeadLetterAddress(reference);
+                     storageManager.completeOperations();
                   }
                   catch (Exception e)
                   {
@@ -1005,7 +1009,8 @@
                   {
                      try
                      {
-                        sendToDeadLetterAddress(reference);
+                        storageManager.updateScheduledDeliveryTime(reference);
+                        storageManager.completeOperations();
                      }
                      catch (Exception e)
                      {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-20 01:00:17 UTC (rev 8326)
@@ -147,14 +147,38 @@
          storageManager.prepare(id, xid);
 
          state = State.PREPARED;
+         // We use the Callback even for non persistence
+         // If we are using non-persistence with replication, the replication manager will have
+         // to execute this runnable in the correct order
+         storageManager.afterCompleteOperations(new IOAsyncTask()
+         {
 
-         if (operations != null)
-         {
-            for (TransactionOperation operation : operations)
+            public void onError(int errorCode, String errorMessage)
             {
-               operation.afterPrepare(this);
+               log.warn("IO Error completing the transaction, code = " + errorCode + ", message = " + errorMessage);
             }
-         }
+
+            public void done()
+            {
+               if (operations != null)
+               {
+                  System.out.println("Prepare was executed fine");
+                  for (TransactionOperation operation : operations)
+                  {
+                     try
+                     {
+                        operation.afterPrepare(TransactionImpl.this);
+                     }
+                     catch (Exception e)
+                     {
+                        // https://jira.jboss.org/jira/browse/HORNETQ-188
+                        // After commit shouldn't throw an exception
+                        log.warn(e.getMessage(), e);
+                     }
+                  }
+               }
+            }
+         });
       }
    }
 
@@ -186,7 +210,11 @@
             {
                if (state == State.ACTIVE)
                {
-                  prepare();
+                  // Why do we need a prepare record on the onePhase optimization?
+                  // Why we can't just go straight to commit, if we are doing one phase anyway?
+                  state = State.PREPARED;
+//                  System.out.println("Adding Prepare");
+//                  prepare();
                }
             }
             if (state != State.PREPARED)
@@ -212,6 +240,7 @@
 
          if (containsPersistent || (xid != null && state == State.PREPARED))
          {
+            System.out.println("Adding commit");
             storageManager.commit(id);
 
             state = State.COMMITTED;
@@ -230,6 +259,7 @@
 
             public void done()
             {
+               System.out.println("Commit was executed fine");
                if (operations != null)
                {
                   for (TransactionOperation operation : operations)
@@ -283,13 +313,38 @@
 
          state = State.ROLLEDBACK;
 
-         if (operations != null)
+         // We use the Callback even for non persistence
+         // If we are using non-persistence with replication, the replication manager will have
+         // to execute this runnable in the correct order
+         storageManager.afterCompleteOperations(new IOAsyncTask()
          {
-            for (TransactionOperation operation : operations)
+
+            public void onError(int errorCode, String errorMessage)
             {
-               operation.afterRollback(this);
+               log.warn("IO Error completing the transaction, code = " + errorCode + ", message = " + errorMessage);
             }
-         }
+
+            public void done()
+            {
+               if (operations != null)
+               {
+                  System.out.println("Rollback was executed fine");
+                  for (TransactionOperation operation : operations)
+                  {
+                     try
+                     {
+                        operation.afterRollback(TransactionImpl.this);
+                     }
+                     catch (Exception e)
+                     {
+                        // https://jira.jboss.org/jira/browse/HORNETQ-188
+                        // After commit shouldn't throw an exception
+                        log.warn(e.getMessage(), e);
+                     }
+                  }
+               }
+            }
+         });
       }
    }
 



More information about the hornetq-commits mailing list