[jboss-svn-commits] JBL Code SVN: r19568 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss: soa/esb/listeners/message and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Apr 15 06:29:37 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-04-15 06:29:36 -0400 (Tue, 15 Apr 2008)
New Revision: 19568

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Reintroduce transactional behaviour: JBESB-1665

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-04-15 10:14:25 UTC (rev 19567)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-04-15 10:29:36 UTC (rev 19568)
@@ -79,7 +79,6 @@
     /** Reference to a Queue or Topic Connection, we only need one per pool */
     protected Connection jmsConnection ;
     
-    
     /** The Indentifier of the pool */
     private Map<String, String> poolKey;
 
@@ -189,11 +188,6 @@
             ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
             if (freeSessions.size() > 0)
             {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Returning session, poolsize=" + getSessionsInPool() 
-                            + ", maxsize=" + MAX_SESSIONS 
-                            + ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
-                }
                 final JmsSession session = freeSessions.remove(freeSessions.size()-1);
                 inUseSessions.add(session);
                 return session ;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-04-15 10:14:25 UTC (rev 19567)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-04-15 10:29:36 UTC (rev 19568)
@@ -61,12 +61,12 @@
     /**
      * Cleanup actions
      */
-    private enum Cleanup { close, release }
+    private enum Cleanup { close, release, none }
     
     /**
      * The cleanup action for the synchronization.
      */
-    private Cleanup cleanupAction = Cleanup.close ;
+    private Cleanup cleanupAction = Cleanup.none ;
     
     /**
      * Create the session wrapper.
@@ -132,14 +132,28 @@
         return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
     }
     
-    protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
     {
-        cleanupAction = Cleanup.close ;
+        if (associated)
+        {
+            cleanupAction = Cleanup.close ;
+        }
+        else
+        {
+            pool.handleCloseSession(this) ;
+        }
     }
     
-    protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+    protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
     {
-        cleanupAction = Cleanup.release ;
+        if (associated)
+        {
+            cleanupAction = Cleanup.release ;
+        }
+        else
+        {
+            pool.handleReleaseSession(this) ;
+        }
     }
     
     protected synchronized void associate()
@@ -147,6 +161,7 @@
     {
         if (!associated)
         {
+            cleanupAction = Cleanup.none ;
             final XAResource resource = session.getXAResource() ;
             final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
             try
@@ -190,6 +205,8 @@
         case release:
             pool.handleReleaseSession(this) ;
             break ;
+        case none:
+            // Reference held by caller
         }
         associated = false ;
     }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-04-15 10:14:25 UTC (rev 19567)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-04-15 10:29:36 UTC (rev 19568)
@@ -32,12 +32,13 @@
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.couriers.CourierUtil;
 import org.jboss.soa.esb.couriers.FaultMessageException;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -86,6 +87,10 @@
          */
         private long errorDelay ;
 
+        private TransactionStrategy transactionStrategy;
+        private boolean transactional = false;
+        private boolean rollbackOnPipelineFaults = true;
+
         /**
 	 * public constructor
 	 *
@@ -156,6 +161,11 @@
                     }
                 }
                 _latencySecs = lSeconds ;
+                
+                transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+                transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+                
+                rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
 	}
 
         /**
@@ -170,32 +180,20 @@
             try
             {
                 pipeline = new ActionProcessingPipeline(_config) ;
+                pipeline.setTransactional(transactional);
                 pipeline.initialise() ;
             }
             catch (final ConfigurationException ce)
             {
                 throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
             }
+            
             this.pipeline = pipeline ;
-            final TwoWayCourier pickUpCourier ;
+            final PickUpOnlyCourier pickUpCourier ;
             try
             {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
-                }
+                pickUpCourier = getCourier() ;
+                cleanCourier(pickUpCourier) ;
             }
             catch (final MalformedEPRException mepre)
             {
@@ -205,16 +203,13 @@
             {
                 throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
             }
-
-            _pickUpCourier = pickUpCourier ;
-
+            
             try
             {
                 RegistryUtil.register(_config, _epr);
             }
             catch (final RegistryException re)
             {
-                CourierUtil.cleanCourier(_pickUpCourier);
                 throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
             }
         }
@@ -261,17 +256,49 @@
             }
         }
 
+        /**
+         * We have JMS transactional delivery/work semantics: before pulling a unit of work
+         * we start a transaction. If the pipeline completes successfully then we will
+         * commit that transaction and the OUW will be deleted. If we have to roll back
+         * the transaction then the UOW will be placed back on the input "queue" (assumes that
+         * the courier is transactional).
+         * 
+         * @param maxWaitMillis
+         */
 	public void waitForEventAndProcess (long maxWaitMillis)
 	{
 		Message message = null ;
+		boolean problem = false;
+		
+                PickUpOnlyCourier pickUpCourier = null ;
 		try
 		{
-			message = (maxWaitMillis > 0) ? _pickUpCourier
+			transactionStrategy.begin();
+			
+			pickUpCourier = getCourier() ;
+			
+			message = (maxWaitMillis > 0) ? pickUpCourier
 					.pickup(maxWaitMillis) : null;
                         errorDelay = 0 ;
 		}
+		catch (TransactionStrategyException ex)
+		{
+			_logger.error("Could not begin transaction!");
+			
+			problem = true;
+			
+			return;
+		}
+                catch (MalformedEPRException e)
+                {
+                        problem = true;
+                        
+                        return;
+                }
 		catch (CourierTimeoutException e)
 		{
+			problem = true;
+			
 			return;
 		}
 		catch (FaultMessageException fme)
@@ -291,25 +318,39 @@
                         }
                         _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
                         waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+			problem = true;
+			
 			return;
 		}
+		finally
+		{
+			if (problem || (message == null))
+			{
+			    cleanCourier(pickUpCourier) ;
+			
+				rollbackTransaction();
+			}
+		}
 
 		if (null != message)
 		{
-                    final Message pipelineMessage = message ;
-                    final Runnable pipelineRunner = new Runnable() {
-                        public void run() {
-                            try {
-                                pipeline.process(pipelineMessage) ;
-                            } finally {
-                                updateThreadCount(-1) ;
-                            }
-                        }
-                    } ;
-                    updateThreadCount(+1);
-                    _execService.execute(pipelineRunner);
+			try
+			{
+				final Message pipelineMessage = message ;
+				final Object txHandle = transactionStrategy.suspend();
+				final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
+				
+				updateThreadCount(+1);
+				_execService.execute(txRunner);
+			}
+			catch (TransactionStrategyException ex)
+			{
+				_logger.warn("Caught transaction related exception: ", ex);
+				cleanCourier(pickUpCourier);
+				rollbackTransaction();
+			}
 		}
-
 	} // ________________________________
 
         /**
@@ -392,7 +433,131 @@
                 }
             }
         }
+        
+        private void rollbackTransaction ()
+        {
+        	try
+        	{
+        		transactionStrategy.rollbackOnly();
+        		transactionStrategy.terminate();
+        	}
+        	catch (Throwable ex)
+        	{
+        		_logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+        	}
+        }
+        
+        private PickUpOnlyCourier getCourier()
+            throws MalformedEPRException, CourierException
+        {
+            PickUpOnlyCourier pickUpCourier = _pickUpCourier;
+            if (transactional || (pickUpCourier == null))
+            {
+                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+                try
+                {
+                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                        "setPollLatency", new Class[] { Long.class });
+                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+                }
+                catch (final NoSuchMethodException nsme)
+                {
+                        // OK, just leave it null
+                }
+                catch (final Throwable th)
+                {
+                    CourierUtil.cleanCourier(pickUpCourier);
+                    throw new CourierException("Problems invoking setPollLatency(long)", th);
+                }
+                
+                if (!transactional)
+                {
+                    _pickUpCourier = pickUpCourier ;
+                }
+            }
 
+            return pickUpCourier;
+        }
+
+        private void cleanCourier(final PickUpOnlyCourier pickUpOnlyCourier)
+        {
+            if (transactional)
+            {
+                CourierUtil.cleanCourier(pickUpOnlyCourier) ;
+            }
+        }
+
+        class TransactionalRunner implements Runnable
+        {
+        	public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+        	{
+        		_courier = courier;
+        		_pipelineMessage = pipelineMessage;
+        		_txHandle = txHandle;
+        	}
+        	
+        	public void run()
+        	{
+        		boolean problem = false;
+        		
+        		try
+        		{
+        			if (_txHandle != null)
+        			{
+        			    transactionStrategy.resume(_txHandle);
+        			}
+        			
+        			/*
+        			 * Current strategy is to commit as long as process returns true.
+        			 * If fails, or any exceptions are caught, then we roll back.
+        			 * 
+        			 * TODO re-examine the semantics around true/false from the pipeline.
+        			 */
+        			
+        			// TODO consider adding a RollbackOnFalse option to allow override.
+        			
+        			problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+        			if (!problem)
+        			{
+        				transactionStrategy.terminate();
+        			}
+        		}
+        		catch (TransactionStrategyException ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught transaction exception: ", ex);
+        		}
+        		catch (RuntimeException ex)
+        		{
+        			problem = true;
+        			
+        			throw ex;
+        		}
+        		catch (Throwable ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught throwable: ",ex);
+        		}
+        		finally
+        		{
+        		    cleanCourier(_courier);
+        			if (problem)
+        			{
+        				rollbackTransaction();
+        			}
+        			
+        			updateThreadCount(-1);
+        		}
+        	}
+        	
+        	private PickUpOnlyCourier _courier;
+        	private Message _pipelineMessage;
+        	private Object _txHandle;
+        }
+        
         private ConfigTree _config;
 
         private String _eprCategoryName;




More information about the jboss-svn-commits mailing list