[jboss-svn-commits] JBL Code SVN: r38112 - in labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta: src/org/jboss/internal/soa/esb/couriers/tx and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed May 30 13:38:04 EDT 2012


Author: tcunning
Date: 2012-05-30 13:38:01 -0400 (Wed, 30 May 2012)
New Revision: 38112

Modified:
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
Log:
JBESB-3743
Commit Kevin's patch for a single XAResource in one transaction.


Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java	2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java	2012-05-30 17:38:01 UTC (rev 38112)
@@ -21,7 +21,6 @@
  */
 package org.jboss.internal.soa.esb.couriers.transport;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -38,13 +37,9 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.tx.InVMXAResource;
-import org.jboss.internal.soa.esb.message.format.MessageSerializer;
 import org.jboss.soa.esb.addressing.eprs.InVMEpr;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.message.ByReferenceMessage;
 import org.jboss.soa.esb.message.Message;
 
 import com.arjuna.common.util.propertyservice.PropertyManager;
@@ -134,15 +129,8 @@
              * Can't do lockstep wait here because otherwise the transaction may not terminate if this
              * is the transaction controller thread!
              */
-            final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
-            try
-            {
-                txStrategy.enlistResource(new InVMXAResource(inVMEpr, addedObject, InVMXAResource.Operation.INSERT));
-            }
-            catch (final TransactionStrategyException tse)
-            {
-                throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
-            }
+            final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+            resource.addEntry(inVMEpr, addedObject, InVMXAResource.Operation.INSERT);
         }
         else
         {
@@ -218,15 +206,8 @@
                     {
                         LOGGER.debug("Pickup enlisting transactional resource for service " + serviceId) ;
                     }
-                    final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
-                    try
-                    {
-                        txStrategy.enlistResource(new InVMXAResource(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE)) ;
-                    }
-                    catch (final TransactionStrategyException tse)
-                    {
-                        throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
-                    }
+                    final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+                    resource.addEntry(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE) ;
                 }
                 return message ;
             }

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java	2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java	2012-05-30 17:38:01 UTC (rev 38112)
@@ -394,15 +394,8 @@
              * Can't do lockstep wait here because otherwise the transaction may not terminate if this
              * is the transaction controller thread!
              */
-            final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
-            try
-            {
-                txStrategy.enlistResource(new InVMXAResource(inVMEpr, addedObject, InVMXAResource.Operation.INSERT));
-            }
-            catch (final TransactionStrategyException tse)
-            {
-                throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
-            }
+            final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+            resource.addEntry(inVMEpr, addedObject, InVMXAResource.Operation.INSERT);
         }
         else
         {
@@ -463,15 +456,8 @@
                 {
                     LOGGER.debug("Pickup enlisting transactional resource for service " + serviceId) ;
                 }
-                final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
-                try
-                {
-                    txStrategy.enlistResource(new InVMXAResource(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE)) ;
-                }
-                catch (final TransactionStrategyException tse)
-                {
-                    throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
-                }
+                final InVMXAResource resource = InVMResourceManager.getInstance().getXAResource() ;
+                resource.addEntry(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE) ;
             }
             return message ;
         }

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	2012-05-30 17:38:01 UTC (rev 38112)
@@ -23,12 +23,15 @@
 package org.jboss.internal.soa.esb.couriers.tx;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.transport.InVMResourceManager;
 import org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport;
 import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
 import org.jboss.internal.soa.esb.couriers.transport.InVMException;
@@ -89,11 +92,9 @@
         }
     }
     
-    public InVMXAResource (final InVMEpr inVMEpr, final Object msgObject, final Operation op)
+    public void addEntry(final InVMEpr inVMEpr, final Object msgObject, final Operation op)
     {
-        this.inVMEpr = inVMEpr ;
-        this.msgObject = msgObject ;
-        _opcode = op;
+        entries.add(new InVMXAResourceEntry(inVMEpr, msgObject, op)) ;
     }
     
     /*
@@ -106,27 +107,34 @@
     
     public void commit (Xid xid, boolean onePhase) throws XAException
     {
-        if (_opcode == Operation.INSERT)
+        removeXAResource() ;
+        if (entries != null)
         {
-            boolean problem = false;
-            
-            try
+            for(InVMXAResourceEntry entry: entries)
             {
-                deliverTx() ;
+                if (entry.getOpcode() == Operation.INSERT)
+                {
+                    boolean problem = false;
+                    
+                    try
+                    {
+                        deliverTx(entry.getInVMEpr(), entry.getMsgObject()) ;
+                    }
+                    catch (final Exception ex)
+                    {
+                        _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+                        
+                        problem = true;  // oops!
+                    }
+                    
+                    if (problem)  // not a lot we can do at this stage
+                    {
+                        _logger.warn("InVMXAResource failed to commit to the InVM queue!");
+                        
+                        throw new XAException(XAException.XA_HEURHAZ);
+                    }
+                }
             }
-            catch (final Exception ex)
-            {
-                _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-                
-                problem = true;  // oops!
-            }
-            
-            if (problem)  // not a lot we can do at this stage
-            {
-                _logger.warn("InVMXAResource failed to commit to the InVM queue!");
-                
-                throw new XAException(XAException.XA_HEURHAZ);
-            }
         }
     }
 
@@ -168,52 +176,62 @@
     
     public void rollback (Xid xid) throws XAException
     {
-        if (_opcode == Operation.REMOVE) // put the message back on the queue
+        removeXAResource() ;
+        if (entries != null)
         {
-            /*
-             * The message goes back on the queue. This may not be the same
-             * location as it had previously, but any attempt to do a truly
-             * transactional queue will affect performance adversely, for relatively
-             * little benefit. In an asynchronous world, applications should not
-             * be written assuming that a queue (or any transport) guarantees ordering.
-             * The ordering (or lack thereof) should be dealt with at the application
-             * level, where possible.
-             * 
-             * TODO we could add a queue-insertion policy that allows the developer to override
-             * how the message gets placed back into the queue.
-             */
-            if(assertRedeliver()) {
-                boolean problem = false;
-                try
+            for(InVMXAResourceEntry entry: entries)
+            {
+                if (entry.getOpcode() == Operation.REMOVE) // put the message back on the queue
                 {
-                    deliverTx() ;
+                    /*
+                     * The message goes back on the queue. This may not be the same
+                     * location as it had previously, but any attempt to do a truly
+                     * transactional queue will affect performance adversely, for relatively
+                     * little benefit. In an asynchronous world, applications should not
+                     * be written assuming that a queue (or any transport) guarantees ordering.
+                     * The ordering (or lack thereof) should be dealt with at the application
+                     * level, where possible.
+                     * 
+                     * TODO we could add a queue-insertion policy that allows the developer to override
+                     * how the message gets placed back into the queue.
+                     */
+                    final InVMEpr inVMEpr = entry.getInVMEpr() ;
+                    final Object origMsgObject = entry.getMsgObject() ;
+                    final Object msgObject = assertRedeliver(inVMEpr, origMsgObject) ;
+                    if(msgObject != null) {
+                        boolean problem = false;
+                        try
+                        {
+                            deliverTx(inVMEpr, msgObject) ;
+                        }
+                        catch (final Exception ex)
+                        {
+                            _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+        
+                            problem = true;
+                        }
+        
+                        if (problem)  // shouldn't get here, but ...
+                        {
+                            _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+        
+                            throw new XAException(XAException.XA_HEURHAZ);
+                        }
+                    } else {
+                        // Send to the DLQ...
+                        try {
+                            deliverToDLQ(getMessage(inVMEpr, origMsgObject));
+                        } catch (MessageDeliverException e) {
+                            _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+                            throw new XAException(XAException.XA_HEURHAZ);
+                        }
+                    }
                 }
-                catch (final Exception ex)
-                {
-                    _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-
-                    problem = true;
-                }
-
-                if (problem)  // shouldn't get here, but ...
-                {
-                    _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
-
-                    throw new XAException(XAException.XA_HEURHAZ);
-                }
-            } else {
-                // Send to the DLQ...
-                try {
-                    deliverToDLQ(getMessage());
-                } catch (MessageDeliverException e) {
-                    _logger.debug("Unexpected exception received when delivering to the courier", e) ;
-                    throw new XAException(XAException.XA_HEURHAZ);
-                }
             }
         }
     }
 
-    protected void deliverTx()
+    protected void deliverTx(final InVMEpr inVMEpr, final Object msgObject)
         throws Exception
     {
         if (inVMEpr.isTemporaryEPR())
@@ -237,8 +255,8 @@
     {
     }
 
-    private boolean assertRedeliver() throws XAException {
-        Message message = getMessage();
+    private Object assertRedeliver(final InVMEpr inVMEpr, final Object msgObject) throws XAException {
+        Message message = getMessage(inVMEpr, msgObject);
         Integer retryCount = (Integer) message.getContext().getContext(INVM_RETRY_COUNT);
 
         if(retryCount == null || retryCount < retryLimit) {
@@ -251,20 +269,21 @@
 
             // Need to recreate the delivery object with the incremented
             // redelivery count...
+            final Object result ;
             try {
-                msgObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
+                result = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
             } catch (InVMException e) {
                 _logger.debug("Unexpected exception received when delivering to the courier", e) ;
                 throw new XAException(XAException.XA_HEURHAZ);
             }
             
-            return true;
+            return result;
         } else {
-            return false;
+            return null;
         }
     }
 
-    private Message getMessage() throws XAException {
+    private Message getMessage(final InVMEpr inVMEpr, final Object msgObject) throws XAException {
         Message message;
         try {
             message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
@@ -294,16 +313,59 @@
             dlQueueInvoker.deliverAsync(message);
         }
     }
+    
+    private void removeXAResource()
+        throws XAException
+    {
+        try
+        {
+            InVMResourceManager.getInstance().removeXAResource() ;
+        }
+        catch (final InVMException invme)
+        {
+            _logger.warn("InVMXAResource could not be disassociated from Resource Manager") ;
+            throw new XAException(XAException.XA_HEURHAZ) ;
+        }
+    }
 
     public boolean isSameRM (XAResource xares) throws XAException
    {
        return (xares == this);
    }
 
-    private transient InVMEpr inVMEpr;
-    private transient Object msgObject;
-    private transient Operation _opcode;
+    private transient List<InVMXAResourceEntry> entries = new ArrayList<InVMXAResourceEntry>() ;
+
     private transient int _timeout;
     
     protected static final Logger _logger = Logger.getLogger(InVMXAResource.class);
+    
+    private static final class InVMXAResourceEntry
+    {
+        private final InVMEpr inVMEpr ;
+        private final Object msgObject ;
+        private final Operation opcode ;
+        
+        InVMXAResourceEntry(final InVMEpr inVMEpr, final Object msgObject, final Operation opcode)
+        {
+            this.inVMEpr = inVMEpr ;
+            this.msgObject = msgObject ;
+            this.opcode = opcode ;
+        }
+        
+        InVMEpr getInVMEpr()
+        {
+            return inVMEpr ;
+        }
+        
+        Object getMsgObject()
+        {
+            return msgObject ;
+        }
+        
+        Operation getOpcode()
+        {
+            return opcode ;
+        }
+    }
+
 }
\ No newline at end of file

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java	2012-05-30 04:17:28 UTC (rev 38111)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java	2012-05-30 17:38:01 UTC (rev 38112)
@@ -74,10 +74,10 @@
         private boolean deliveredToDQL;
 
         public MockInVMXAResource(final InVMEpr inVMEpr, final Object msgObject, final Operation op) {
-            super(inVMEpr, msgObject, op);
+            addEntry(inVMEpr, msgObject, op);
         }
 
-        protected void deliverTx() throws Exception {
+        protected void deliverTx(final InVMEpr inVMEpr, final Object msgObject) throws Exception {
             deliveryCount++;
         }
 



More information about the jboss-svn-commits mailing list