[jboss-cvs] JBoss Messaging SVN: r8253 - in branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842: src/main/org/jboss/jms/server/endpoint and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 4 05:39:31 EDT 2011


Author: raggz
Date: 2011-04-04 05:39:31 -0400 (Mon, 04 Apr 2011)
New Revision: 8253

Added:
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java
Modified:
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
   branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
Log:
Merged JIRA JBMessaging-1837.



Property changes on: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:8114,8157-8158
   + /branches/Branch_1_4:8114,8155-8158

Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-04-04 09:39:31 UTC (rev 8253)
@@ -2520,9 +2520,48 @@
 
       public void afterRollback(boolean onePhase) throws TransactionException
       {
-         //One phase rollbacks never hit the server - they are dealt with locally only
-         //so this would only ever be executed for a two phase rollback.
+         if (log.isTraceEnabled()) { log.trace(this + " afterRollback, onePhase: " + onePhase); }
+         //One phase rollbacks usually don't hit the server - they are dealt with locally only
+         //but if one-phase commit fails, we need to rollback the delivery
+         if (onePhase)
+         {
+            // Remove the deliveries from the delivery map.
+            Iterator iter = delList.iterator();
+            while (iter.hasNext())
+            {
+               Long deliveryId = (Long)iter.next();
 
+               DeliveryRecord del = (DeliveryRecord)deliveries.remove(deliveryId);
+
+               if (del != null && del.replicating)
+               {
+                  //TODO - we could batch this in one message
+                  try
+                  {
+                     postOffice.sendReplicateAckMessage(del.queueName, del.del.getReference().getMessage().getMessageID());
+                  }
+                  catch (Exception e)
+                  {
+                     throw new TransactionException("Failed to handle send ack", e);
+                  }
+               }
+
+               if (isCC && del != null)
+               {
+                  try
+                  {
+                     checkClose();
+                  }
+                  catch (JMSException e)
+                  {
+                     //we don't need to do anything here.
+                     log.warn("Exception closing a CC session " + this);
+                  }
+               }
+            }
+         }
+         
+         //for a two phase rollback.
          //We don't do anything since cancellation is driven from the client.
       }
 

Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java	2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java	2011-04-04 09:39:31 UTC (rev 8253)
@@ -699,6 +699,18 @@
                throw new MessagingXAException(XAException.XA_RBCOMMFAIL, "A Throwable was caught in sending the transaction", t);
             }            
          }
+         else if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
+         {
+            //for one-phase commit, we may have a rollback exeption
+            if (t instanceof XAException)
+            {
+               throw new MessagingXAException(XAException.XA_RBOTHER, "A Throwable was caught in sending one phase commit", t);
+            }
+            else
+            {
+               throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending one phase commit", t);            
+            }
+         }
          else
          {
             throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);            

Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-04-04 09:39:31 UTC (rev 8253)
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
 
 import org.jboss.logging.Logger;
@@ -203,24 +204,51 @@
        
       boolean onePhase = state != STATE_PREPARED;
       
-      if (firstCallback != null)
+      Iterator iter = null;
+
+      try
       {
-         firstCallback.beforeCommit(onePhase);
+         
+         if (firstCallback != null)
+         {
+            firstCallback.beforeCommit(onePhase);
+         }
+
+         iter = callbacks.iterator();
+
+         while (iter.hasNext())
+         {
+            TxCallback callback = (TxCallback)iter.next();
+
+            callback.beforeCommit(onePhase);
+         }
+
+         state = STATE_COMMITTED;
+
+         if (trace)
+         {
+            log.trace(this + " committed");
+         }
+
       }
-      
-      Iterator iter = callbacks.iterator();
-      
-      while (iter.hasNext())
+      catch (Exception e)
       {
-         TxCallback callback = (TxCallback)iter.next();
-         
-         callback.beforeCommit(onePhase);
+         // for one-phase commit, we need to rollback the message.
+         if (onePhase)
+         {
+            if (trace)
+            {
+               log.trace(this + " one-phase commit results in rollback.");
+            }
+            
+            rollback();
+            
+            throw new XAException(XAException.XA_RBOTHER);
+         }
+         // for 2-pc commit, we throw the exception
+         throw e;
       }
       
-      state = STATE_COMMITTED;
-      
-      if (trace) { log.trace(this + " committed"); }
-      
       iter = callbacks.iterator();
       
       if (trace) { log.trace(this + " executing after commit hooks"); }
@@ -353,7 +381,7 @@
          
          if (callback instanceof TxCallbackEx)
          {
-            ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered);            
+            ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered || onePhase);            
          }
          else
          {

Copied: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java (from rev 8155, branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java	2011-04-04 09:39:31 UTC (rev 8253)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2010, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A FakeJDBCPersistenceManager
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Dec 24, 2010 12:09:22 PM
+ *
+ *
+ */
+public class FakeJDBCPersistenceManager extends JDBCPersistenceManager
+{
+
+   public FakeJDBCPersistenceManager(DataSource ds,
+                                     TransactionManager tm,
+                                     Properties sqlProperties,
+                                     boolean createTablesOnStartup,
+                                     boolean usingBatchUpdates,
+                                     boolean usingBinaryStream,
+                                     boolean usingTrailingByte,
+                                     int maxParams,
+                                     boolean supportsBlobSelect,
+                                     boolean supportsSetNullOnBlobs)
+   {
+      super(ds,
+            tm,
+            sqlProperties,
+            createTablesOnStartup,
+            usingBatchUpdates,
+            usingBinaryStream,
+            usingTrailingByte,
+            maxParams,
+            supportsBlobSelect,
+            supportsSetNullOnBlobs);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   boolean poisoned1pc;
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
+   {
+      if (poisoned1pc) throw new Exception("Fake exception");
+      super.handleBeforeCommit1PC(refsToAdd, refsToRemove, tx);
+   }
+
+   public void poisonHandleCommit1PC()
+   {
+      poisoned1pc = true;
+   }
+   
+   public void restoreHandleCommit1PC()
+   {
+      poisoned1pc = false;
+   }
+
+}

Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2011-04-04 09:39:31 UTC (rev 8253)
@@ -31,12 +31,13 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.contract.Channel;
+import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
-import org.jboss.messaging.core.impl.IDManager;
 import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.MessagingQueue;
 import org.jboss.messaging.core.impl.message.SimpleMessageStore;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -49,6 +50,8 @@
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
  * @version <tt>1.1</tt>
  *
  * JDBCPersistenceManagerTest.java,v 1.1 2006/02/22 17:33:44 timfox Exp
@@ -106,6 +109,26 @@
       return p;
    }
 
+   protected void doSetup2(boolean batch, boolean useBinaryStream,
+                          boolean trailingByte, int maxParams) throws Throwable
+   {
+      pm = createPM2(batch, useBinaryStream, trailingByte, maxParams);
+      ms = new SimpleMessageStore();
+   }
+
+   protected JDBCPersistenceManager createPM2(boolean batch, boolean useBinaryStream,
+                                             boolean trailingByte, int maxParams) throws Throwable
+   {
+      JDBCPersistenceManager p =
+         new FakeJDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
+                  sc.getPersistenceManagerSQLProperties(),
+                  true, batch, useBinaryStream, trailingByte, maxParams, !sc.getDatabaseName().equals("oracle") && !sc.getDatabaseName().equals("db2"),
+                  !sc.getDatabaseName().equals("db2"));
+      ((JDBCPersistenceManager)p).injectNodeID(1);
+      p.start();
+      return p;
+   }
+
    public void tearDown() throws Exception
    {
       sc.stop();
@@ -936,7 +959,71 @@
       assertTrue(containsMessage(ms, ref1.getMessage().getMessageID()));
    }
 
+   //https://issues.jboss.org/browse/JBMESSAGING-1837
+   public void testCommitOnePhaseFailure() throws Throwable
+   {
+      doSetup2(true, true, true, 100);
 
+      TransactionRepository txRep = new TransactionRepository(pm, ms, 0);
+      txRep.start();
+      
+      MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, false);
+      queue.activate();
+      
+      SimpleReceiver r = new SimpleReceiver("AckingReceiver", SimpleReceiver.ACKING);
+      assertTrue(queue.getLocalDistributor().add(r));
+
+      SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
+
+      log.debug("sending a message");
+
+      Message[] messages = createMessages(1);
+
+      Message m1 = messages[0];
+
+      Transaction tx = txRep.createTransaction(new MockXid());
+
+      MessageReference ref1 = ms.reference(m1);
+      
+      Delivery delivery = queue.handle(observer, ref1, null);
+      
+      assertTrue(r.getMessages().size() == 1);
+      
+      //simulating processing transaction
+      delivery.acknowledge(tx);
+      
+      //poison pm
+      ((FakeJDBCPersistenceManager)pm).poisonHandleCommit1PC();
+      
+      //commit one phase
+      try
+      {
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         //ignore the exception.
+      }
+      
+      //message received again.
+      assertEquals(2, r.getMessages().size());
+      //delivering count 1
+      assertTrue(queue.getDeliveringCount() == 1);
+      
+      //restore
+      ((FakeJDBCPersistenceManager)pm).restoreHandleCommit1PC();
+      
+      //another tx
+      tx = txRep.createTransaction(new MockXid());
+      delivery.acknowledge(tx);
+      tx.commit();
+      
+      //still received twice.
+      assertTrue(r.getMessages().size() == 2);
+      //delivering count 0
+      assertTrue(queue.getDeliveringCount() == 0);
+   }
+
    protected Message createMessage(byte i, boolean reliable) throws Throwable
    {
       Map headers = generateFilledMap(true);



More information about the jboss-cvs-commits mailing list