[jboss-svn-commits] JBL Code SVN: r38112 - in labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta: src/org/jboss/internal/soa/esb/couriers/tx and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed May 30 13:38:04 EDT 2012
Author: tcunning
Date: 2012-05-30 13:38:01 -0400 (Wed, 30 May 2012)
New Revision: 38112
Modified:
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
Log:
JBESB-3743
Commit Kevin's patch for a single XAResource in one transaction.
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java 2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java 2012-05-30 17:38:01 UTC (rev 38112)
@@ -21,7 +21,6 @@
*/
package org.jboss.internal.soa.esb.couriers.transport;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -38,13 +37,9 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.tx.InVMXAResource;
-import org.jboss.internal.soa.esb.message.format.MessageSerializer;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.message.ByReferenceMessage;
import org.jboss.soa.esb.message.Message;
import com.arjuna.common.util.propertyservice.PropertyManager;
@@ -134,15 +129,8 @@
* Can't do lockstep wait here because otherwise the transaction may not terminate if this
* is the transaction controller thread!
*/
- final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
- try
- {
- txStrategy.enlistResource(new InVMXAResource(inVMEpr, addedObject, InVMXAResource.Operation.INSERT));
- }
- catch (final TransactionStrategyException tse)
- {
- throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
- }
+ final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+ resource.addEntry(inVMEpr, addedObject, InVMXAResource.Operation.INSERT);
}
else
{
@@ -218,15 +206,8 @@
{
LOGGER.debug("Pickup enlisting transactional resource for service " + serviceId) ;
}
- final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
- try
- {
- txStrategy.enlistResource(new InVMXAResource(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE)) ;
- }
- catch (final TransactionStrategyException tse)
- {
- throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
- }
+ final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+ resource.addEntry(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE) ;
}
return message ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java 2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java 2012-05-30 17:38:01 UTC (rev 38112)
@@ -394,15 +394,8 @@
* Can't do lockstep wait here because otherwise the transaction may not terminate if this
* is the transaction controller thread!
*/
- final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
- try
- {
- txStrategy.enlistResource(new InVMXAResource(inVMEpr, addedObject, InVMXAResource.Operation.INSERT));
- }
- catch (final TransactionStrategyException tse)
- {
- throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
- }
+ final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+ resource.addEntry(inVMEpr, addedObject, InVMXAResource.Operation.INSERT);
}
else
{
@@ -463,15 +456,8 @@
{
LOGGER.debug("Pickup enlisting transactional resource for service " + serviceId) ;
}
- final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
- try
- {
- txStrategy.enlistResource(new InVMXAResource(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE)) ;
- }
- catch (final TransactionStrategyException tse)
- {
- throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
- }
+ final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+ resource.addEntry(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE) ;
}
return message ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java 2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java 2012-05-30 17:38:01 UTC (rev 38112)
@@ -23,12 +23,15 @@
package org.jboss.internal.soa.esb.couriers.tx;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.transport.InVMResourceManager;
import org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport;
import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
import org.jboss.internal.soa.esb.couriers.transport.InVMException;
@@ -89,11 +92,9 @@
}
}
- public InVMXAResource (final InVMEpr inVMEpr, final Object msgObject, final Operation op)
+ public void addEntry(final InVMEpr inVMEpr, final Object msgObject, final Operation op)
{
- this.inVMEpr = inVMEpr ;
- this.msgObject = msgObject ;
- _opcode = op;
+ entries.add(new InVMXAResourceEntry(inVMEpr, msgObject, op)) ;
}
/*
@@ -106,27 +107,34 @@
public void commit (Xid xid, boolean onePhase) throws XAException
{
- if (_opcode == Operation.INSERT)
+ removeXAResource() ;
+ if (entries != null)
{
- boolean problem = false;
-
- try
+ for(InVMXAResourceEntry entry: entries)
{
- deliverTx() ;
+ if (entry.getOpcode() == Operation.INSERT)
+ {
+ boolean problem = false;
+
+ try
+ {
+ deliverTx(entry.getInVMEpr(), entry.getMsgObject()) ;
+ }
+ catch (final Exception ex)
+ {
+ _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+
+ problem = true; // oops!
+ }
+
+ if (problem) // not a lot we can do at this stage
+ {
+ _logger.warn("InVMXAResource failed to commit to the InVM queue!");
+
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ }
}
- catch (final Exception ex)
- {
- _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-
- problem = true; // oops!
- }
-
- if (problem) // not a lot we can do at this stage
- {
- _logger.warn("InVMXAResource failed to commit to the InVM queue!");
-
- throw new XAException(XAException.XA_HEURHAZ);
- }
}
}
@@ -168,52 +176,62 @@
public void rollback (Xid xid) throws XAException
{
- if (_opcode == Operation.REMOVE) // put the message back on the queue
+ removeXAResource() ;
+ if (entries != null)
{
- /*
- * The message goes back on the queue. This may not be the same
- * location as it had previously, but any attempt to do a truly
- * transactional queue will affect performance adversely, for relatively
- * little benefit. In an asynchronous world, applications should not
- * be written assuming that a queue (or any transport) guarantees ordering.
- * The ordering (or lack thereof) should be dealt with at the application
- * level, where possible.
- *
- * TODO we could add a queue-insertion policy that allows the developer to override
- * how the message gets placed back into the queue.
- */
- if(assertRedeliver()) {
- boolean problem = false;
- try
+ for(InVMXAResourceEntry entry: entries)
+ {
+ if (entry.getOpcode() == Operation.REMOVE) // put the message back on the queue
{
- deliverTx() ;
+ /*
+ * The message goes back on the queue. This may not be the same
+ * location as it had previously, but any attempt to do a truly
+ * transactional queue will affect performance adversely, for relatively
+ * little benefit. In an asynchronous world, applications should not
+ * be written assuming that a queue (or any transport) guarantees ordering.
+ * The ordering (or lack thereof) should be dealt with at the application
+ * level, where possible.
+ *
+ * TODO we could add a queue-insertion policy that allows the developer to override
+ * how the message gets placed back into the queue.
+ */
+ final InVMEpr inVMEpr = entry.getInVMEpr() ;
+ final Object origMsgObject = entry.getMsgObject() ;
+ final Object msgObject = assertRedeliver(inVMEpr, origMsgObject) ;
+ if(msgObject != null) {
+ boolean problem = false;
+ try
+ {
+ deliverTx(inVMEpr, msgObject) ;
+ }
+ catch (final Exception ex)
+ {
+ _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+
+ problem = true;
+ }
+
+ if (problem) // shouldn't get here, but ...
+ {
+ _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ } else {
+ // Send to the DLQ...
+ try {
+ deliverToDLQ(getMessage(inVMEpr, origMsgObject));
+ } catch (MessageDeliverException e) {
+ _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ }
}
- catch (final Exception ex)
- {
- _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-
- problem = true;
- }
-
- if (problem) // shouldn't get here, but ...
- {
- _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
-
- throw new XAException(XAException.XA_HEURHAZ);
- }
- } else {
- // Send to the DLQ...
- try {
- deliverToDLQ(getMessage());
- } catch (MessageDeliverException e) {
- _logger.debug("Unexpected exception received when delivering to the courier", e) ;
- throw new XAException(XAException.XA_HEURHAZ);
- }
}
}
}
- protected void deliverTx()
+ protected void deliverTx(final InVMEpr inVMEpr, final Object msgObject)
throws Exception
{
if (inVMEpr.isTemporaryEPR())
@@ -237,8 +255,8 @@
{
}
- private boolean assertRedeliver() throws XAException {
- Message message = getMessage();
+ private Object assertRedeliver(final InVMEpr inVMEpr, final Object msgObject) throws XAException {
+ Message message = getMessage(inVMEpr, msgObject);
Integer retryCount = (Integer) message.getContext().getContext(INVM_RETRY_COUNT);
if(retryCount == null || retryCount < retryLimit) {
@@ -251,20 +269,21 @@
// Need to recreate the delivery object with the incremented
// redelivery count...
+ final Object result ;
try {
- msgObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
+ result = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
} catch (InVMException e) {
_logger.debug("Unexpected exception received when delivering to the courier", e) ;
throw new XAException(XAException.XA_HEURHAZ);
}
- return true;
+ return result;
} else {
- return false;
+ return null;
}
}
- private Message getMessage() throws XAException {
+ private Message getMessage(final InVMEpr inVMEpr, final Object msgObject) throws XAException {
Message message;
try {
message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
@@ -294,16 +313,59 @@
dlQueueInvoker.deliverAsync(message);
}
}
+
+ private void removeXAResource()
+ throws XAException
+ {
+ try
+ {
+ InVMResourceManager.getInstance().removeXAResource() ;
+ }
+ catch (final InVMException invme)
+ {
+ _logger.warn("InVMXAResource could not be disassociated from Resource Manager") ;
+ throw new XAException(XAException.XA_HEURHAZ) ;
+ }
+ }
public boolean isSameRM (XAResource xares) throws XAException
{
return (xares == this);
}
- private transient InVMEpr inVMEpr;
- private transient Object msgObject;
- private transient Operation _opcode;
+ private transient List<InVMXAResourceEntry> entries = new ArrayList<InVMXAResourceEntry>() ;
+
private transient int _timeout;
protected static final Logger _logger = Logger.getLogger(InVMXAResource.class);
+
+ private static final class InVMXAResourceEntry
+ {
+ private final InVMEpr inVMEpr ;
+ private final Object msgObject ;
+ private final Operation opcode ;
+
+ InVMXAResourceEntry(final InVMEpr inVMEpr, final Object msgObject, final Operation opcode)
+ {
+ this.inVMEpr = inVMEpr ;
+ this.msgObject = msgObject ;
+ this.opcode = opcode ;
+ }
+
+ InVMEpr getInVMEpr()
+ {
+ return inVMEpr ;
+ }
+
+ Object getMsgObject()
+ {
+ return msgObject ;
+ }
+
+ Operation getOpcode()
+ {
+ return opcode ;
+ }
+ }
+
}
\ No newline at end of file
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java 2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java 2012-05-30 17:38:01 UTC (rev 38112)
@@ -74,10 +74,10 @@
private boolean deliveredToDQL;
public MockInVMXAResource(final InVMEpr inVMEpr, final Object msgObject, final Operation op) {
- super(inVMEpr, msgObject, op);
+ addEntry(inVMEpr, msgObject, op);
}
- protected void deliverTx() throws Exception {
+ protected void deliverTx(final InVMEpr inVMEpr, final Object msgObject) throws Exception {
deliveryCount++;
}
More information about the jboss-svn-commits
mailing list