[jboss-cvs] JBoss Messaging SVN: r8253 - in branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842: src/main/org/jboss/jms/server/endpoint and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 4 05:39:31 EDT 2011
Author: raggz
Date: 2011-04-04 05:39:31 -0400 (Mon, 04 Apr 2011)
New Revision: 8253
Added:
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java
Modified:
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
Log:
Merged JIRA JBMessaging-1837.
Property changes on: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:8114,8157-8158
+ /branches/Branch_1_4:8114,8155-8158
Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2011-04-04 09:39:31 UTC (rev 8253)
@@ -2520,9 +2520,48 @@
public void afterRollback(boolean onePhase) throws TransactionException
{
- //One phase rollbacks never hit the server - they are dealt with locally only
- //so this would only ever be executed for a two phase rollback.
+ if (log.isTraceEnabled()) { log.trace(this + " afterRollback, onePhase: " + onePhase); }
+ //One phase rollbacks usually don't hit the server - they are dealt with locally only
+ //but if one-phase commit fails, we need to rollback the delivery
+ if (onePhase)
+ {
+ // Remove the deliveries from the delivery map.
+ Iterator iter = delList.iterator();
+ while (iter.hasNext())
+ {
+ Long deliveryId = (Long)iter.next();
+ DeliveryRecord del = (DeliveryRecord)deliveries.remove(deliveryId);
+
+ if (del != null && del.replicating)
+ {
+ //TODO - we could batch this in one message
+ try
+ {
+ postOffice.sendReplicateAckMessage(del.queueName, del.del.getReference().getMessage().getMessageID());
+ }
+ catch (Exception e)
+ {
+ throw new TransactionException("Failed to handle send ack", e);
+ }
+ }
+
+ if (isCC && del != null)
+ {
+ try
+ {
+ checkClose();
+ }
+ catch (JMSException e)
+ {
+ //we don't need to do anything here.
+ log.warn("Exception closing a CC session " + this);
+ }
+ }
+ }
+ }
+
+ //for a two phase rollback.
//We don't do anything since cancellation is driven from the client.
}
Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java 2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/jms/tx/ResourceManager.java 2011-04-04 09:39:31 UTC (rev 8253)
@@ -699,6 +699,18 @@
throw new MessagingXAException(XAException.XA_RBCOMMFAIL, "A Throwable was caught in sending the transaction", t);
}
}
+ else if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
+ {
+ //for one-phase commit, we may have a rollback exeption
+ if (t instanceof XAException)
+ {
+ throw new MessagingXAException(XAException.XA_RBOTHER, "A Throwable was caught in sending one phase commit", t);
+ }
+ else
+ {
+ throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending one phase commit", t);
+ }
+ }
else
{
throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);
Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java 2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/src/main/org/jboss/messaging/core/impl/tx/Transaction.java 2011-04-04 09:39:31 UTC (rev 8253)
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
@@ -203,24 +204,51 @@
boolean onePhase = state != STATE_PREPARED;
- if (firstCallback != null)
+ Iterator iter = null;
+
+ try
{
- firstCallback.beforeCommit(onePhase);
+
+ if (firstCallback != null)
+ {
+ firstCallback.beforeCommit(onePhase);
+ }
+
+ iter = callbacks.iterator();
+
+ while (iter.hasNext())
+ {
+ TxCallback callback = (TxCallback)iter.next();
+
+ callback.beforeCommit(onePhase);
+ }
+
+ state = STATE_COMMITTED;
+
+ if (trace)
+ {
+ log.trace(this + " committed");
+ }
+
}
-
- Iterator iter = callbacks.iterator();
-
- while (iter.hasNext())
+ catch (Exception e)
{
- TxCallback callback = (TxCallback)iter.next();
-
- callback.beforeCommit(onePhase);
+ // for one-phase commit, we need to rollback the message.
+ if (onePhase)
+ {
+ if (trace)
+ {
+ log.trace(this + " one-phase commit results in rollback.");
+ }
+
+ rollback();
+
+ throw new XAException(XAException.XA_RBOTHER);
+ }
+ // for 2-pc commit, we throw the exception
+ throw e;
}
- state = STATE_COMMITTED;
-
- if (trace) { log.trace(this + " committed"); }
-
iter = callbacks.iterator();
if (trace) { log.trace(this + " executing after commit hooks"); }
@@ -353,7 +381,7 @@
if (callback instanceof TxCallbackEx)
{
- ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered);
+ ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered || onePhase);
}
else
{
Copied: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java (from rev 8155, branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java 2011-04-04 09:39:31 UTC (rev 8253)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2010, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A FakeJDBCPersistenceManager
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Dec 24, 2010 12:09:22 PM
+ *
+ *
+ */
+public class FakeJDBCPersistenceManager extends JDBCPersistenceManager
+{
+
+ public FakeJDBCPersistenceManager(DataSource ds,
+ TransactionManager tm,
+ Properties sqlProperties,
+ boolean createTablesOnStartup,
+ boolean usingBatchUpdates,
+ boolean usingBinaryStream,
+ boolean usingTrailingByte,
+ int maxParams,
+ boolean supportsBlobSelect,
+ boolean supportsSetNullOnBlobs)
+ {
+ super(ds,
+ tm,
+ sqlProperties,
+ createTablesOnStartup,
+ usingBatchUpdates,
+ usingBinaryStream,
+ usingTrailingByte,
+ maxParams,
+ supportsBlobSelect,
+ supportsSetNullOnBlobs);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ boolean poisoned1pc;
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
+ {
+ if (poisoned1pc) throw new Exception("Fake exception");
+ super.handleBeforeCommit1PC(refsToAdd, refsToRemove, tx);
+ }
+
+ public void poisonHandleCommit1PC()
+ {
+ poisoned1pc = true;
+ }
+
+ public void restoreHandleCommit1PC()
+ {
+ poisoned1pc = false;
+ }
+
+}
Modified: branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2011-04-04 09:33:38 UTC (rev 8252)
+++ branches/Branch_JBossMessaging_1_4_7_GA_JBMESSAGING-1822_JBMESSAGING-1837_JBMESSAGING-1842/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2011-04-04 09:39:31 UTC (rev 8253)
@@ -31,12 +31,13 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.contract.Channel;
+import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
-import org.jboss.messaging.core.impl.IDManager;
import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.MessagingQueue;
import org.jboss.messaging.core.impl.message.SimpleMessageStore;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -49,6 +50,8 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
* @version <tt>1.1</tt>
*
* JDBCPersistenceManagerTest.java,v 1.1 2006/02/22 17:33:44 timfox Exp
@@ -106,6 +109,26 @@
return p;
}
+ protected void doSetup2(boolean batch, boolean useBinaryStream,
+ boolean trailingByte, int maxParams) throws Throwable
+ {
+ pm = createPM2(batch, useBinaryStream, trailingByte, maxParams);
+ ms = new SimpleMessageStore();
+ }
+
+ protected JDBCPersistenceManager createPM2(boolean batch, boolean useBinaryStream,
+ boolean trailingByte, int maxParams) throws Throwable
+ {
+ JDBCPersistenceManager p =
+ new FakeJDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
+ sc.getPersistenceManagerSQLProperties(),
+ true, batch, useBinaryStream, trailingByte, maxParams, !sc.getDatabaseName().equals("oracle") && !sc.getDatabaseName().equals("db2"),
+ !sc.getDatabaseName().equals("db2"));
+ ((JDBCPersistenceManager)p).injectNodeID(1);
+ p.start();
+ return p;
+ }
+
public void tearDown() throws Exception
{
sc.stop();
@@ -936,7 +959,71 @@
assertTrue(containsMessage(ms, ref1.getMessage().getMessageID()));
}
+ //https://issues.jboss.org/browse/JBMESSAGING-1837
+ public void testCommitOnePhaseFailure() throws Throwable
+ {
+ doSetup2(true, true, true, 100);
+ TransactionRepository txRep = new TransactionRepository(pm, ms, 0);
+ txRep.start();
+
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, false);
+ queue.activate();
+
+ SimpleReceiver r = new SimpleReceiver("AckingReceiver", SimpleReceiver.ACKING);
+ assertTrue(queue.getLocalDistributor().add(r));
+
+ SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
+
+ log.debug("sending a message");
+
+ Message[] messages = createMessages(1);
+
+ Message m1 = messages[0];
+
+ Transaction tx = txRep.createTransaction(new MockXid());
+
+ MessageReference ref1 = ms.reference(m1);
+
+ Delivery delivery = queue.handle(observer, ref1, null);
+
+ assertTrue(r.getMessages().size() == 1);
+
+ //simulating processing transaction
+ delivery.acknowledge(tx);
+
+ //poison pm
+ ((FakeJDBCPersistenceManager)pm).poisonHandleCommit1PC();
+
+ //commit one phase
+ try
+ {
+ tx.commit();
+ }
+ catch (Exception e)
+ {
+ //ignore the exception.
+ }
+
+ //message received again.
+ assertEquals(2, r.getMessages().size());
+ //delivering count 1
+ assertTrue(queue.getDeliveringCount() == 1);
+
+ //restore
+ ((FakeJDBCPersistenceManager)pm).restoreHandleCommit1PC();
+
+ //another tx
+ tx = txRep.createTransaction(new MockXid());
+ delivery.acknowledge(tx);
+ tx.commit();
+
+ //still received twice.
+ assertTrue(r.getMessages().size() == 2);
+ //delivering count 0
+ assertTrue(queue.getDeliveringCount() == 0);
+ }
+
protected Message createMessage(byte i, boolean reliable) throws Throwable
{
Map headers = generateFilledMap(true);
More information about the jboss-cvs-commits
mailing list