[jboss-svn-commits] JBL Code SVN: r29539 - in labs/jbossesb/trunk/product/rosetta: src/org/jboss/internal/soa/esb/couriers/tx and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Oct 6 11:32:25 EDT 2009
Author: tfennelly
Date: 2009-10-06 11:32:25 -0400 (Tue, 06 Oct 2009)
New Revision: 29539
Added:
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
Log:
https://jira.jboss.org/jira/browse/JBESB-2866
Add a max resent attribute for transactional invm transport
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java 2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTemporaryTransport.java 2009-10-06 15:32:25 UTC (rev 29539)
@@ -121,26 +121,8 @@
{
LOGGER.debug("Delivering message to " + serviceId) ;
}
- final Object addedObject ;
- if (inVMEpr.getPassByValue())
- {
- try
- {
- addedObject = MessageSerializer.serialize(message) ;
- }
- catch (final IOException ex)
- {
- throw new InVMException("Could not serialize message to pass by value.", ex) ;
- }
- }
- else if (message instanceof ByReferenceMessage)
- {
- addedObject = ((ByReferenceMessage)message).reference() ;
- }
- else
- {
- addedObject = message ;
- }
+
+ final Object addedObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
if (InVMTransport.isTransactional())
{
@@ -220,28 +202,9 @@
final Object msgObject = queueEntry.getValue() ;
if (msgObject != null)
{
- final Message message ;
- try
- {
- if (msgObject instanceof byte[])
- {
- message = MessageSerializer.deserialize((byte[])msgObject) ;
- }
- else if (inVMEpr.getPassByValue())
- {
- // pass by reference but now expecting value.
- message = ((Message)msgObject).copy() ;
- }
- else
- {
- message = (Message)msgObject ;
- }
- }
- catch (final IOException ioe)
- {
- throw new InVMException("Failed to deserialise incoming message", ioe) ;
- }
-
+
+ final Message message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+
if (InVMTransport.isTransactional())
{
/*
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java 2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/transport/InVMTransport.java 2009-10-06 15:32:25 UTC (rev 29539)
@@ -381,28 +381,9 @@
{
releaseReadLock() ;
}
+
+ final Object addedObject = toDeliveryObject(message, passByValue);
- final Object addedObject ;
- if (passByValue)
- {
- try
- {
- addedObject = MessageSerializer.serialize(message) ;
- }
- catch (final IOException ex)
- {
- throw new InVMException("Could not serialize message to pass by value.", ex) ;
- }
- }
- else if (message instanceof ByReferenceMessage)
- {
- addedObject = ((ByReferenceMessage)message).reference() ;
- }
- else
- {
- addedObject = message ;
- }
-
if (isTransactional())
{
if (LOGGER.isDebugEnabled())
@@ -458,7 +439,7 @@
{
throw new InVMException("Could not locate service entry for epr " + inVMEpr) ;
}
-
+
final Object msgObject = entry.pickup(millis) ;
if (msgObject != null)
{
@@ -466,28 +447,9 @@
{
LOGGER.debug("Pickup of message from " + serviceId) ;
}
- final Message message ;
- try
- {
- if (msgObject instanceof byte[])
- {
- message = MessageSerializer.deserialize((byte[])msgObject) ;
- }
- else if (inVMEpr.getPassByValue())
- {
- // pass by reference but now expecting value.
- message = ((Message)msgObject).copy() ;
- }
- else
- {
- message = (Message)msgObject ;
- }
- }
- catch (final IOException ioe)
- {
- throw new InVMException("Failed to deserialise incoming message", ioe) ;
- }
-
+
+ final Message message = fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+
if (isTransactional())
{
/*
@@ -520,7 +482,7 @@
* Deliver an object as a consequence of a transaction. This will either be a
* rollback, placing the object back on the source queue, or a commit delivering
* to a target queue.
- *
+ *
* @param inVMEpr The EPR to receive the message.
* @param msgObject The object to deliver.
* @throws InVMException for InVM transport specific errors.
@@ -543,7 +505,7 @@
{
throw new InVMException("Could not locate service entry for epr " + inVMEpr) ;
}
-
+
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Transactional redelivery of message to " + serviceId) ;
@@ -552,6 +514,69 @@
}
/**
+ * Encode a message to an Object ready for delivery.
+ * @param message The message to be encoded.
+ * @param passByValue If the message will be delivered by value (as opposed to be reference).
+ * @return The delivery Object.
+ * @throws InVMException Error encoding message.
+ */
+ public static Object toDeliveryObject(Message message, boolean passByValue) throws InVMException {
+ final Object object ;
+ if (passByValue)
+ {
+ try
+ {
+ object = MessageSerializer.serialize(message) ;
+ }
+ catch (final IOException ex)
+ {
+ throw new InVMException("Could not serialize message to pass by value.", ex) ;
+ }
+ }
+ else if (message instanceof ByReferenceMessage)
+ {
+ object = ((ByReferenceMessage)message).reference() ;
+ }
+ else
+ {
+ object = message ;
+ }
+ return object;
+ }
+
+ /**
+ * Decode a delivery Object instance back to an ESB {@link Message} object instance.
+ * @param msgObject The delivery Object to be decoded.
+ * @param passByValue If the message was delivered by value (as opposed to be reference).
+ * @return The ESB Message Object instance.
+ * @throws InVMException Error decoding message.
+ */
+ public static Message fromDeliveryObject(Object msgObject, boolean passByValue) throws InVMException {
+ final Message message ;
+ try
+ {
+ if (msgObject instanceof byte[])
+ {
+ message = MessageSerializer.deserialize((byte[])msgObject) ;
+ }
+ else if (passByValue)
+ {
+ // pass by reference but now expecting value.
+ message = ((Message)msgObject).copy() ;
+ }
+ else
+ {
+ message = (Message)msgObject ;
+ }
+ }
+ catch (final IOException ioe)
+ {
+ throw new InVMException("Failed to deserialise incoming message", ioe) ;
+ }
+ return message;
+ }
+
+ /**
* Acquire a read lock for accessing the data.
*/
private void acquireReadLock()
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java 2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java 2009-10-06 15:32:25 UTC (rev 29539)
@@ -31,7 +31,14 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport;
import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
+import org.jboss.internal.soa.esb.couriers.transport.InVMException;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
/**
* This XAResource instance controls the InVM queue manipulation under the control
@@ -54,12 +61,33 @@
public class InVMXAResource implements XAResource, Serializable
{
+
+ public static final String INVM_RETRY_COUNT = "org.jboss.soa.esb.invm.retry.count";
+
/**
* Serial version UID for this class.
*/
private static final long serialVersionUID = 77430212548543969L;
+ /**
+ * Redelivery retry limit.
+ */
+ private static int retryLimit;
+ /**
+ * Dead letter channel ServiceInvoker. Messages are delivered to the DLQ after the retry limit for a
+ * failed message has been exceeded.
+ */
+ private static ServiceInvoker dlQueueInvoker;
public enum operation { INSERT, REMOVE };
+
+ static {
+ String retryLimitConfig = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).getProperty(Environment.INVM_RETRY_LIMIT, "5").trim();
+ try {
+ retryLimit = Integer.parseInt(retryLimitConfig);
+ } catch (NumberFormatException e) {
+ retryLimit = 5;
+ }
+ }
public InVMXAResource (final InVMEpr inVMEpr, final Object msgObject, final operation op)
{
@@ -154,30 +182,38 @@
* TODO we could add a queue-insertion policy that allows the developer to override
* how the message gets placed back into the queue.
*/
-
- boolean problem = false;
-
- try
- {
- deliverTx() ;
+ if(assertRedeliver()) {
+ boolean problem = false;
+ try
+ {
+ deliverTx() ;
+ }
+ catch (final Exception ex)
+ {
+ _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
+
+ problem = true;
+ }
+
+ if (problem) // shouldn't get here, but ...
+ {
+ _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ } else {
+ // Send to the DLQ...
+ try {
+ deliverToDLQ(getMessage());
+ } catch (MessageDeliverException e) {
+ _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
}
- catch (final Exception ex)
- {
- _logger.debug("Unexpected exception received when delivering to the courier", ex) ;
-
- problem = true;
- }
-
- if (problem) // shouldn't get here, but ...
- {
- _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
-
- throw new XAException(XAException.XA_HEURHAZ);
- }
}
}
- private void deliverTx()
+ protected void deliverTx()
throws Exception
{
if (inVMEpr.isTemporaryEPR())
@@ -201,6 +237,66 @@
{
}
+ private boolean assertRedeliver() throws XAException {
+ Message message = getMessage();
+ Integer retryCount = (Integer) message.getContext().getContext(INVM_RETRY_COUNT);
+
+ if(retryCount == null || retryCount < retryLimit) {
+ // Increment the retry count...
+ if(retryCount == null) {
+ message.getContext().setContext(INVM_RETRY_COUNT, 1);
+ } else {
+ message.getContext().setContext(INVM_RETRY_COUNT, retryCount + 1);
+ }
+
+ // Need to recreate the delivery object with the incremented
+ // redelivery count...
+ try {
+ msgObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
+ } catch (InVMException e) {
+ _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private Message getMessage() throws XAException {
+ Message message;
+ try {
+ message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());
+ } catch (InVMException e) {
+ _logger.debug("Unexpected exception received when delivering to the courier", e) ;
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ return message;
+ }
+
+ /**
+ * Deliver a message to the Dead Letter Channel Service.
+ *
+ * @param message The message to be delivered to the dead letter chennel.
+ * @throws org.jboss.soa.esb.listeners.message.MessageDeliverException Message delivery failure.
+ */
+ protected void deliverToDLQ(Message message) throws MessageDeliverException {
+ if (!"true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())) {
+ _logger.debug("org.jboss.soa.esb.dls.redeliver is turned off");
+ } else {
+ if (dlQueueInvoker == null) {
+ synchronized (ServiceInvoker.dlqService) {
+ if (dlQueueInvoker == null) {
+ dlQueueInvoker = new ServiceInvoker(ServiceInvoker.dlqService);
+ }
+ }
+ }
+
+ dlQueueInvoker.deliverAsync(message);
+ }
+ }
+
public boolean isSameRM (XAResource xares) throws XAException
{
return (xares == this);
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2009-10-06 11:50:43 UTC (rev 29538)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2009-10-06 15:32:25 UTC (rev 29539)
@@ -41,6 +41,7 @@
public static final String DEFAULT_INVM_SCOPE = "jboss.esb.invm.scope.default";
public static final String INVM_EXPIRY_TIME = "org.jboss.soa.esb.invm.expiryTime";
+ public static final String INVM_RETRY_LIMIT = "org.jboss.soa.esb.invm.retry.limit";
public static final String SMTP_HOST = "org.jboss.soa.esb.mail.smtp.host";
public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";
Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tx/JBESB_2866_UnitTest.java 2009-10-06 15:32:25 UTC (rev 29539)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers.tx;
+
+import junit.framework.TestCase;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
+
+import java.net.URI;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBESB-2866.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JBESB_2866_UnitTest extends TestCase {
+
+ public void test_passByVal() throws Throwable {
+ test(true);
+ }
+
+ public void test_passByRef() throws Throwable {
+ test(false);
+ }
+
+ public void test(boolean passByVal) throws Throwable {
+ InVMEpr epr = new InVMEpr(new URI("invm://service"));
+ Message message = MessageFactory.getInstance().getMessage();
+
+ epr.setPassByValue(passByVal);
+ MockInVMXAResource resource = new MockInVMXAResource(epr, InVMTransport.toDeliveryObject(message, epr.getPassByValue()), InVMXAResource.operation.REMOVE);
+
+ // In reality, a new XAResource would be created for each retry, but it's the underlying
+ // message that matters, so we just need to create 1 instance of MockInVMXAResource and
+ // retry the rollback multiple times...
+ resource.rollback(null);
+ resource.rollback(null);
+ resource.rollback(null);
+ resource.rollback(null);
+ resource.rollback(null);
+ assertEquals(5, resource.deliveryCount);
+ assertFalse(resource.deliveredToDQL); // not set yet
+
+ // The retry count (default 5) has been hit now. Nest attempt should result
+ // in the message being sent to the DLQ...
+ resource.rollback(null);
+ assertEquals(5, resource.deliveryCount); // Same as last time
+ assertTrue(resource.deliveredToDQL); // set now
+ }
+
+ private class MockInVMXAResource extends InVMXAResource {
+
+ private int deliveryCount;
+ private boolean deliveredToDQL;
+
+ public MockInVMXAResource(final InVMEpr inVMEpr, final Object msgObject, final operation op) {
+ super(inVMEpr, msgObject, op);
+ }
+
+ protected void deliverTx() throws Exception {
+ deliveryCount++;
+ }
+
+ protected void deliverToDLQ(Message message) throws MessageDeliverException {
+ deliveredToDQL = true;
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list