[jboss-svn-commits] JBL Code SVN: r29539 - in labs/jbossesb/trunk/product/rosetta: src/org/jboss/internal/soa/esb/couriers/tx and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Oct 6 11:32:25 EDT 2009


Author: tfennelly
Date: 2009-10-06 11:32:25 -0400 (Tue, 06 Oct 2009)
New Revision: 29539

Added:
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
Log:
https://jira.jboss.org/jira/browse/JBESB-2866
Add a max resent attribute for transactional invm transport

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java	2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java	2009-10-06 15:32:25 UTC (rev 29539)
@@ -121,26 +121,8 @@
         {
             LOGGER.debug("Delivering message to " + serviceId) ;
         }
-        final Object addedObject ;
-        if (inVMEpr.getPassByValue())
-        {
-            try
-            {
-                addedObject = MessageSerializer.serialize(message) ;
-            }
-            catch (final IOException ex)
-            {
-                throw new InVMException("Could not serialize message to pass by value.", ex) ;
-            }
-        }
-        else if (message instanceof ByReferenceMessage)
-        {
-            addedObject = ((ByReferenceMessage)message).reference() ;
-        }
-        else
-        {
-            addedObject = message ;
-        }
+
+        final Object addedObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
         
         if (InVMTransport.isTransactional())
         {
@@ -220,28 +202,9 @@
             final Object msgObject = queueEntry.getValue() ;
             if (msgObject != null)
             {
-                final Message message ;
-                try
-                {
-                    if (msgObject instanceof byte[])
-                    {
-                        message = MessageSerializer.deserialize((byte[])msgObject) ;
-                    }
-                    else if (inVMEpr.getPassByValue())
-                    {
-                        // pass by reference but now expecting value.
-                        message = ((Message)msgObject).copy() ;
-                    }
-                    else
-                    {
-                        message = (Message)msgObject ;
-                    }
-                }
-                catch (final IOException ioe)
-                {
-                    throw new InVMException("Failed to deserialise incoming message", ioe) ;
-                }
-                
+
+                final Message message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+
                 if (InVMTransport.isTransactional())
                 {
                     /*

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java	2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java	2009-10-06 15:32:25 UTC (rev 29539)
@@ -381,28 +381,9 @@
         {
             releaseReadLock() ;
         }
+
+        final Object addedObject = toDeliveryObject(message, passByValue);
         
-        final Object addedObject ;
-        if (passByValue)
-        {
-            try
-            {
-                addedObject = MessageSerializer.serialize(message) ;
-            }
-            catch (final IOException ex)
-            {
-                throw new InVMException("Could not serialize message to pass by value.", ex) ;
-            }
-        }
-        else if (message instanceof ByReferenceMessage)
-        {
-            addedObject = ((ByReferenceMessage)message).reference() ;
-        }
-        else
-        {
-            addedObject = message ;
-        }
-        
         if (isTransactional())
         {
             if (LOGGER.isDebugEnabled())
@@ -458,7 +439,7 @@
         {
             throw new InVMException("Could not locate service entry for epr " + inVMEpr) ;
         }
-        
+
         final Object msgObject = entry.pickup(millis) ;
         if (msgObject != null)
         {
@@ -466,28 +447,9 @@
             {
                 LOGGER.debug("Pickup of message from " + serviceId) ;
             }
-            final Message message ;
-            try
-            {
-                if (msgObject instanceof byte[])
-                {
-                    message = MessageSerializer.deserialize((byte[])msgObject) ;
-                }
-                else if (inVMEpr.getPassByValue())
-                {
-                    // pass by reference but now expecting value.
-                    message = ((Message)msgObject).copy() ;
-                }
-                else
-                {
-                    message = (Message)msgObject ;
-                }
-            }
-            catch (final IOException ioe)
-            {
-                throw new InVMException("Failed to deserialise incoming message", ioe) ;
-            }
-            
+
+            final Message message = fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+
             if (isTransactional())
             {
                 /*
@@ -520,7 +482,7 @@
      * Deliver an object as a consequence of a transaction.  This will either be a
      * rollback, placing the object back on the source queue, or a commit delivering
      * to a target queue.
-     * 
+     *
      * @param inVMEpr The EPR to receive the message.
      * @param msgObject The object to deliver.
      * @throws InVMException for InVM transport specific errors.
@@ -543,7 +505,7 @@
         {
             throw new InVMException("Could not locate service entry for epr " + inVMEpr) ;
         }
-        
+
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Transactional redelivery of message to " + serviceId) ;
@@ -552,6 +514,69 @@
     }
 
     /**
+     * Encode a message to an Object ready for delivery.
+     * @param message The message to be encoded.
+     * @param passByValue If the message will be delivered by value (as opposed to be reference).
+     * @return The delivery Object.
+     * @throws InVMException Error encoding message.
+     */
+    public static Object toDeliveryObject(Message message, boolean passByValue) throws InVMException {
+        final Object object ;
+        if (passByValue)
+        {
+            try
+            {
+                object = MessageSerializer.serialize(message) ;
+            }
+            catch (final IOException ex)
+            {
+                throw new InVMException("Could not serialize message to pass by value.", ex) ;
+            }
+        }
+        else if (message instanceof ByReferenceMessage)
+        {
+            object = ((ByReferenceMessage)message).reference() ;
+        }
+        else
+        {
+            object = message ;
+        }
+        return object;
+    }
+
+    /**
+     * Decode a delivery Object instance back to an ESB {@link Message} object instance.
+     * @param msgObject The delivery Object to be decoded.
+     * @param passByValue If the message was delivered by value (as opposed to be reference).
+     * @return The ESB Message Object instance.
+     * @throws InVMException Error decoding message.
+     */
+    public static Message fromDeliveryObject(Object msgObject, boolean passByValue) throws InVMException {
+        final Message message ;
+        try
+        {
+            if (msgObject instanceof byte[])
+            {
+                message = MessageSerializer.deserialize((byte[])msgObject) ;
+            }
+            else if (passByValue)
+            {
+                // pass by reference but now expecting value.
+                message = ((Message)msgObject).copy() ;
+            }
+            else
+            {
+                message = (Message)msgObject ;
+            }
+        }
+        catch (final IOException ioe)
+        {
+            throw new InVMException("Failed to deserialise incoming message", ioe) ;
+        }
+        return message;
+    }
+
+    /**
      * Acquire a read lock for accessing the data.
      */
     private void acquireReadLock()

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	2009-10-06 15:32:25 UTC (rev 29539)
@@ -31,7 +31,14 @@
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport;
 import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
+import org.jboss.internal.soa.esb.couriers.transport.InVMException;
 import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 
 /**
  * This XAResource instance controls the InVM queue manipulation under the control
@@ -54,12 +61,33 @@
 
 public class InVMXAResource implements XAResource, Serializable
 {
+
+    public static final String INVM_RETRY_COUNT = "org.jboss.soa.esb.invm.retry.count";
+
     /**
      * Serial version UID for this class.
      */
     private static final long serialVersionUID = 77430212548543969L;
+    /**
+     * Redelivery retry limit.
+     */
+    private static int retryLimit;
+    /**
+     * Dead letter channel ServiceInvoker. Messages are delivered to the DLQ after the retry limit for a
+     * failed message has been exceeded.
+     */
+    private static ServiceInvoker dlQueueInvoker;
 
     public enum operation { INSERT, REMOVE };
+
+    static {
+        String retryLimitConfig = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).getProperty(Environment.INVM_RETRY_LIMIT, "5").trim();
+        try {
+            retryLimit = Integer.parseInt(retryLimitConfig);
+        } catch (NumberFormatException e) {
+            retryLimit = 5;
+        }
+    }
     
     public InVMXAResource (final InVMEpr inVMEpr, final Object msgObject, final operation op)
     {
@@ -154,30 +182,38 @@
              * TODO we could add a queue-insertion policy that allows the developer to override
              * how the message gets placed back into the queue.
              */
-            
-            boolean problem = false;
-            
-            try
-            {
-                deliverTx() ;
+            if(assertRedeliver()) {
+                boolean problem = false;
+                try
+                {
+                    deliverTx() ;
+                }
+                catch (final Exception ex)
+                {
+                    _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+
+                    problem = true;
+                }
+
+                if (problem)  // shouldn't get here, but ...
+                {
+                    _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+
+                    throw new XAException(XAException.XA_HEURHAZ);
+                }
+            } else {
+                // Send to the DLQ...
+                try {
+                    deliverToDLQ(getMessage());
+                } catch (MessageDeliverException e) {
+                    _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+                    throw new XAException(XAException.XA_HEURHAZ);
+                }
             }
-            catch (final Exception ex)
-            {
-                _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-                
-                problem = true;
-            }
-            
-            if (problem)  // shouldn't get here, but ...
-            {
-                _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
-                
-                throw new XAException(XAException.XA_HEURHAZ);
-            }
         }
     }
 
-    private void deliverTx()
+    protected void deliverTx()
         throws Exception
     {
         if (inVMEpr.isTemporaryEPR())
@@ -201,6 +237,66 @@
     {
     }
 
+    private boolean assertRedeliver() throws XAException {
+        Message message = getMessage();
+        Integer retryCount = (Integer) message.getContext().getContext(INVM_RETRY_COUNT);
+
+        if(retryCount == null || retryCount < retryLimit) {
+            // Increment the retry count...
+            if(retryCount == null) {
+                message.getContext().setContext(INVM_RETRY_COUNT, 1);
+            } else {
+                message.getContext().setContext(INVM_RETRY_COUNT, retryCount + 1);
+            }
+
+            // Need to recreate the delivery object with the incremented
+            // redelivery count...
+            try {
+                msgObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
+            } catch (InVMException e) {
+                _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+                throw new XAException(XAException.XA_HEURHAZ);
+            }
+            
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private Message getMessage() throws XAException {
+        Message message;
+        try {
+            message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+        } catch (InVMException e) {
+            _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+            throw new XAException(XAException.XA_HEURHAZ);
+        }
+        return message;
+    }
+
+    /**
+     * Deliver a message to the Dead Letter Channel Service.
+     *
+     * @param message The message to be delivered to the dead letter chennel.
+     * @throws org.jboss.soa.esb.listeners.message.MessageDeliverException Message delivery failure.
+     */
+    protected void deliverToDLQ(Message message) throws MessageDeliverException {
+        if (!"true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())) {
+            _logger.debug("org.jboss.soa.esb.dls.redeliver is turned off");
+        } else {
+            if (dlQueueInvoker == null) {
+                synchronized (ServiceInvoker.dlqService) {
+                    if (dlQueueInvoker == null) {
+                        dlQueueInvoker = new ServiceInvoker(ServiceInvoker.dlqService);
+                    }
+                }
+            }
+
+            dlQueueInvoker.deliverAsync(message);
+        }
+    }
+
     public boolean isSameRM (XAResource xares) throws XAException
    {
        return (xares == this);

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2009-10-06 15:32:25 UTC (rev 29539)
@@ -41,6 +41,7 @@
 
     public static final String DEFAULT_INVM_SCOPE     = "jboss.esb.invm.scope.default";
     public static final String INVM_EXPIRY_TIME = "org.jboss.soa.esb.invm.expiryTime";
+    public static final String INVM_RETRY_LIMIT = "org.jboss.soa.esb.invm.retry.limit";
 
     public static final String SMTP_HOST     = "org.jboss.soa.esb.mail.smtp.host";
         public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java	2009-10-06 15:32:25 UTC (rev 29539)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers.tx;
+
+import junit.framework.TestCase;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
+
+import java.net.URI;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBESB-2866.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JBESB_2866_UnitTest extends TestCase {
+
+    public void test_passByVal() throws Throwable {
+        test(true);
+    }
+
+    public void test_passByRef() throws Throwable {
+        test(false);
+    }
+
+    public void test(boolean passByVal) throws Throwable {
+        InVMEpr epr = new InVMEpr(new URI("invm://service"));
+        Message message = MessageFactory.getInstance().getMessage();
+
+        epr.setPassByValue(passByVal);
+        MockInVMXAResource resource = new MockInVMXAResource(epr, InVMTransport.toDeliveryObject(message, epr.getPassByValue()), InVMXAResource.operation.REMOVE);
+
+        // In reality, a new XAResource would be created for each retry, but it's the underlying
+        // message that matters, so we just need to create 1 instance of MockInVMXAResource and
+        // retry the rollback multiple times...
+        resource.rollback(null);
+        resource.rollback(null);
+        resource.rollback(null);
+        resource.rollback(null);
+        resource.rollback(null);
+        assertEquals(5, resource.deliveryCount);
+        assertFalse(resource.deliveredToDQL); // not set yet
+
+        // The retry count (default 5) has been hit now.  Nest attempt should result
+        // in the message being sent to the DLQ...
+        resource.rollback(null);
+        assertEquals(5, resource.deliveryCount); // Same as last time
+        assertTrue(resource.deliveredToDQL); // set now
+    }
+
+    private class MockInVMXAResource extends InVMXAResource {
+
+        private int deliveryCount;
+        private boolean deliveredToDQL;
+
+        public MockInVMXAResource(final InVMEpr inVMEpr, final Object msgObject, final operation op) {
+            super(inVMEpr, msgObject, op);
+        }
+
+        protected void deliverTx() throws Exception {
+            deliveryCount++;
+        }
+
+        protected void deliverToDLQ(Message message) throws MessageDeliverException {
+            deliveredToDQL = true;
+        }
+    }
+}



More information about the jboss-svn-commits mailing list