[jboss-svn-commits] JBL Code SVN: r19567 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss: soa/esb/listeners/message and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Apr 15 06:14:25 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-04-15 06:14:25 -0400 (Tue, 15 Apr 2008)
New Revision: 19567
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Reintroduce transactional behaviour: JBESB-1665
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-04-15 10:13:15 UTC (rev 19566)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-04-15 10:14:25 UTC (rev 19567)
@@ -188,11 +188,6 @@
ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
if (freeSessions.size() > 0)
{
- if (logger.isDebugEnabled()) {
- logger.debug("Returning session, poolsize=" + getSessionsInPool()
- + ", maxsize=" + MAX_SESSIONS
- + ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
- }
final JmsSession session = freeSessions.remove(freeSessions.size()-1);
inUseSessions.add(session);
return session ;
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-04-15 10:13:15 UTC (rev 19566)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-04-15 10:14:25 UTC (rev 19567)
@@ -61,12 +61,12 @@
/**
* Cleanup actions
*/
- private enum Cleanup { close, release }
+ private enum Cleanup { close, release, none }
/**
* The cleanup action for the synchronization.
*/
- private Cleanup cleanupAction = Cleanup.close ;
+ private Cleanup cleanupAction = Cleanup.none ;
/**
* Create the session wrapper.
@@ -132,14 +132,28 @@
return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
}
- protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
{
- cleanupAction = Cleanup.close ;
+ if (associated)
+ {
+ cleanupAction = Cleanup.close ;
+ }
+ else
+ {
+ pool.handleCloseSession(this) ;
+ }
}
- protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+ protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
{
- cleanupAction = Cleanup.release ;
+ if (associated)
+ {
+ cleanupAction = Cleanup.release ;
+ }
+ else
+ {
+ pool.handleReleaseSession(this) ;
+ }
}
protected synchronized void associate()
@@ -147,6 +161,7 @@
{
if (!associated)
{
+ cleanupAction = Cleanup.none ;
final XAResource resource = session.getXAResource() ;
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
try
@@ -190,6 +205,8 @@
case release:
pool.handleReleaseSession(this) ;
break ;
+ case none:
+ // Reference held by caller
}
associated = false ;
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-04-15 10:13:15 UTC (rev 19566)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-04-15 10:14:25 UTC (rev 19567)
@@ -32,12 +32,13 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.FaultMessageException;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -86,6 +87,10 @@
*/
private long errorDelay ;
+ private TransactionStrategy transactionStrategy;
+ private boolean transactional = false;
+ private boolean rollbackOnPipelineFaults = true;
+
/**
* public constructor
*
@@ -156,6 +161,11 @@
}
}
_latencySecs = lSeconds ;
+
+ transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+ transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+
+ rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
}
/**
@@ -170,32 +180,20 @@
try
{
pipeline = new ActionProcessingPipeline(_config) ;
+ pipeline.setTransactional(transactional);
pipeline.initialise() ;
}
catch (final ConfigurationException ce)
{
throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
}
+
this.pipeline = pipeline ;
- final TwoWayCourier pickUpCourier ;
+ final PickUpOnlyCourier pickUpCourier ;
try
{
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
- throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
- }
+ pickUpCourier = getCourier() ;
+ cleanCourier(pickUpCourier) ;
}
catch (final MalformedEPRException mepre)
{
@@ -205,16 +203,13 @@
{
throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
}
-
- _pickUpCourier = pickUpCourier ;
-
+
try
{
RegistryUtil.register(_config, _epr);
}
catch (final RegistryException re)
{
- CourierUtil.cleanCourier(_pickUpCourier);
throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
}
}
@@ -261,17 +256,49 @@
}
}
+ /**
+ * We have JMS transactional delivery/work semantics: before pulling a unit of work
+ * we start a transaction. If the pipeline completes successfully then we will
+ * commit that transaction and the OUW will be deleted. If we have to roll back
+ * the transaction then the UOW will be placed back on the input "queue" (assumes that
+ * the courier is transactional).
+ *
+ * @param maxWaitMillis
+ */
public void waitForEventAndProcess (long maxWaitMillis)
{
Message message = null ;
+ boolean problem = false;
+
+ PickUpOnlyCourier pickUpCourier = null ;
try
{
- message = (maxWaitMillis > 0) ? _pickUpCourier
+ transactionStrategy.begin();
+
+ pickUpCourier = getCourier() ;
+
+ message = (maxWaitMillis > 0) ? pickUpCourier
.pickup(maxWaitMillis) : null;
errorDelay = 0 ;
}
+ catch (TransactionStrategyException ex)
+ {
+ _logger.error("Could not begin transaction!");
+
+ problem = true;
+
+ return;
+ }
+ catch (MalformedEPRException e)
+ {
+ problem = true;
+
+ return;
+ }
catch (CourierTimeoutException e)
{
+ problem = true;
+
return;
}
catch (FaultMessageException fme)
@@ -291,25 +318,39 @@
}
_logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+ problem = true;
+
return;
}
+ finally
+ {
+ if (problem || (message == null))
+ {
+ cleanCourier(pickUpCourier) ;
+
+ rollbackTransaction();
+ }
+ }
if (null != message)
{
- final Message pipelineMessage = message ;
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(pipelineMessage) ;
- } finally {
- updateThreadCount(-1) ;
- }
- }
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
+ try
+ {
+ final Message pipelineMessage = message ;
+ final Object txHandle = transactionStrategy.suspend();
+ final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
+
+ updateThreadCount(+1);
+ _execService.execute(txRunner);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ _logger.warn("Caught transaction related exception: ", ex);
+ cleanCourier(pickUpCourier);
+ rollbackTransaction();
+ }
}
-
} // ________________________________
/**
@@ -403,7 +444,131 @@
}
}
}
+
+ private void rollbackTransaction ()
+ {
+ try
+ {
+ transactionStrategy.rollbackOnly();
+ transactionStrategy.terminate();
+ }
+ catch (Throwable ex)
+ {
+ _logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+ }
+ }
+
+ private PickUpOnlyCourier getCourier()
+ throws MalformedEPRException, CourierException
+ {
+ PickUpOnlyCourier pickUpCourier = _pickUpCourier;
+ if (transactional || (pickUpCourier == null))
+ {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+ try
+ {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[] { Long.class });
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+ }
+ catch (final NoSuchMethodException nsme)
+ {
+ // OK, just leave it null
+ }
+ catch (final Throwable th)
+ {
+ CourierUtil.cleanCourier(pickUpCourier);
+ throw new CourierException("Problems invoking setPollLatency(long)", th);
+ }
+
+ if (!transactional)
+ {
+ _pickUpCourier = pickUpCourier ;
+ }
+ }
+ return pickUpCourier;
+ }
+
+ private void cleanCourier(final PickUpOnlyCourier pickUpOnlyCourier)
+ {
+ if (transactional)
+ {
+ CourierUtil.cleanCourier(pickUpOnlyCourier) ;
+ }
+ }
+
+ class TransactionalRunner implements Runnable
+ {
+ public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+ {
+ _courier = courier;
+ _pipelineMessage = pipelineMessage;
+ _txHandle = txHandle;
+ }
+
+ public void run()
+ {
+ boolean problem = false;
+
+ try
+ {
+ if (_txHandle != null)
+ {
+ transactionStrategy.resume(_txHandle);
+ }
+
+ /*
+ * Current strategy is to commit as long as process returns true.
+ * If fails, or any exceptions are caught, then we roll back.
+ *
+ * TODO re-examine the semantics around true/false from the pipeline.
+ */
+
+ // TODO consider adding a RollbackOnFalse option to allow override.
+
+ problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+ if (!problem)
+ {
+ transactionStrategy.terminate();
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught transaction exception: ", ex);
+ }
+ catch (RuntimeException ex)
+ {
+ problem = true;
+
+ throw ex;
+ }
+ catch (Throwable ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught throwable: ",ex);
+ }
+ finally
+ {
+ cleanCourier(_courier);
+ if (problem)
+ {
+ rollbackTransaction();
+ }
+
+ updateThreadCount(-1);
+ }
+ }
+
+ private PickUpOnlyCourier _courier;
+ private Message _pipelineMessage;
+ private Object _txHandle;
+ }
+
private ConfigTree _config;
private String _eprCategoryName;
More information about the jboss-svn-commits
mailing list