[jboss-svn-commits] JBL Code SVN: r16198 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/common and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Oct 31 09:05:18 EDT 2007


Author: kevin.conner at jboss.com
Date: 2007-10-31 09:05:17 -0400 (Wed, 31 Oct 2007)
New Revision: 16198

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategyException.java
Modified:
   labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ScheduleListener.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/ScheduledListenerMapper.java
   labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
Log:
Added transaction capability for redelivery/message store: JBESB-1189

Modified: labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd
===================================================================
--- labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd	2007-10-31 13:05:17 UTC (rev 16198)
@@ -359,6 +359,11 @@
                             <xsd:documentation xml:lang="en">Message Composer class name.</xsd:documentation>
                         </xsd:annotation>
                     </xsd:attribute>
+                    <xsd:attribute name="transacted" type="xsd:boolean">
+                        <xsd:annotation>
+                            <xsd:documentation xml:lang="en">Should the schedule execute in a transacted environment?</xsd:documentation>
+                        </xsd:annotation>
+                    </xsd:attribute>
                 </xsd:extension>
             </xsd:complexContent>
         </xsd:complexType>

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -22,7 +22,12 @@
 
 import java.io.File;
 
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+
+import org.apache.log4j.Logger;
 import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.tm.TransactionManagerLocator;
 
 /**
  * This MBean wraps the configuration to allow ServiceBindingManager
@@ -72,5 +77,106 @@
                 System.setProperty(Environment.PROPERTIES_FILE, this.propertyFile);
             }
         }
+        // We also setup the transaction strategy here
+        TransactionStrategy.setTransactionStrategy(new JTATransactionStrategy()) ;
     }
+    
+    /**
+     * The JTA transaction strategy used in the application server.
+     * @author kevin
+     */
+    private static class JTATransactionStrategy extends TransactionStrategy
+    {
+        /**
+         * The logger for this class.
+         */
+        private static final Logger LOGGER = Logger.getLogger(JTATransactionStrategy.class) ;
+        
+        /**
+         * The transaction manager.
+         */
+        private final TransactionManager tm ;
+        
+        /**
+         * Construct the JTA transaction strategy.
+         */
+        JTATransactionStrategy()
+        {
+            tm = TransactionManagerLocator.getInstance().locate() ;
+        }
+        
+        /**
+         * Begin a transaction on the current thread.
+         * @throws TransactionStrategyException
+         */
+        public void begin()
+            throws TransactionStrategyException
+        {
+            try
+            {
+                tm.begin() ;
+                LOGGER.debug("Transaction started on current thread") ;
+            }
+            catch (final Throwable th)
+            {
+                LOGGER.debug("Failed to start transaction on current thread", th) ;
+                throw new TransactionStrategyException("Failed to begin transaction on current thread", th) ;
+            }
+        }
+        
+        /**
+         * Terminate the transaction on the current thread.
+         * If the transaction has been marked for rollback then it
+         * will be rolled back, otherwise it will be committed.
+         * @throws TransactionStrategyException
+         */
+        public void terminate()
+            throws TransactionStrategyException
+        {
+            try
+            {
+                final int status = tm.getStatus() ;
+                switch(status)
+                {
+                    case Status.STATUS_ACTIVE:
+                        LOGGER.debug("Committing transaction on current thread") ;
+                        tm.commit() ;
+                        break ;
+                    case Status.STATUS_MARKED_ROLLBACK:
+                        LOGGER.debug("Rollback transaction on current thread") ;
+                        tm.rollback();
+                        break ;
+                    default:
+                        // Transaction is in some other state, just disassociate
+                        if (LOGGER.isDebugEnabled())
+                        {
+                            LOGGER.debug("Suspending transaction on current thread, status: " + status) ;
+                        }
+                        tm.suspend() ;
+                }
+            }
+            catch (final Throwable th)
+            {
+                LOGGER.debug("Failed to terminate transaction on current thread", th) ;
+                throw new TransactionStrategyException("Failed to terminate transaction on current thread", th) ;
+            }
+        }
+        
+        /**
+         * Mark the current transaction for rollback.
+         * @throws TransactionStrategyException
+         */
+        public void rollbackOnly()
+            throws TransactionStrategyException
+        {
+            try
+            {
+                tm.setRollbackOnly();
+            }
+            catch (final Throwable th)
+            {
+                throw new TransactionStrategyException("Failed to mark the transaction on current thread for rollback", th) ;
+            }
+        }
+    }
 }

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -0,0 +1,128 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.common;
+
+/**
+ * This class represents the transaction strategy that is currently in force
+ * within the ESB.  At present there are two strategies employed, a null strategy
+ * when running outside of an application server environment and a JTA strategy
+ * when running within.
+ * 
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public abstract class TransactionStrategy
+{
+    /**
+     * The null transaction strategy.
+     */
+    private static final TransactionStrategy NULL_TRANSACTION_STRATEGY = new NullTransactionStrategy() ;
+    /**
+     * The current transaction strategy
+     */
+    private static volatile TransactionStrategy transactionStrategy = NULL_TRANSACTION_STRATEGY ;
+    
+    /**
+     * Get the active transaction strategy.
+     * @param transacted True if the current transacted strategy is required, false for a null strategy.
+     * @return The transaction strategy.
+     */
+    public static TransactionStrategy getTransactionStrategy(final boolean transacted)
+    {
+        return (transacted ? transactionStrategy : NULL_TRANSACTION_STRATEGY) ;
+    }
+    
+    /**
+     * Set the active transaction strategy.
+     * @param transactionStrategy The transaction strategy.
+     */
+    public static void setTransactionStrategy(final TransactionStrategy transactionStrategy)
+    {
+        if (transactionStrategy == null)
+        {
+            TransactionStrategy.transactionStrategy = NULL_TRANSACTION_STRATEGY ;
+        }
+        else
+        {
+            TransactionStrategy.transactionStrategy = transactionStrategy ;
+        }
+    }
+    
+    /**
+     * Begin a transaction on the current thread.
+     * @throws TransactionStrategyException
+     */
+    public abstract void begin()
+        throws TransactionStrategyException ; 
+    
+    /**
+     * Terminate the transaction on the current thread.
+     * If the transaction has been marked for rollback then it
+     * will be rolled back, otherwise it will be committed.
+     * @throws TransactionStrategyException
+     */
+    public abstract void terminate()
+        throws TransactionStrategyException ; 
+    
+    /**
+     * Mark the current transaction for rollback.
+     * @throws TransactionStrategyException
+     */
+    public abstract void rollbackOnly()
+        throws TransactionStrategyException ;
+    
+    /**
+     * The null transaction strategy.
+     * @author kevin
+     */
+    private static class NullTransactionStrategy extends TransactionStrategy
+    {
+        /**
+         * Begin a transaction on the current thread.
+         * @throws TransactionStrategyException
+         */
+        public void begin()
+            throws TransactionStrategyException
+        {
+        }
+        
+        /**
+         * Terminate the transaction on the current thread.
+         * If the transaction has been marked for rollback then it
+         * will be rolled back, otherwise it will be committed.
+         * @throws TransactionStrategyException
+         */
+        public void terminate()
+            throws TransactionStrategyException
+        {
+        }
+        
+        /**
+         * Mark the current transaction for rollback.
+         * @throws TransactionStrategyException
+         */
+        public void rollbackOnly()
+            throws TransactionStrategyException
+        {
+        }
+    }
+}
\ No newline at end of file


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategyException.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategyException.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategyException.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.common;
+
+/**
+ * The exception raised to cover errors using transaction strategies.
+ * 
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class TransactionStrategyException extends Exception
+{
+    /**
+     * serial version UID of this exception.
+     */
+    private static final long serialVersionUID = 7547084336832099461L;
+
+    /**
+     * Construct a default transaction strategy exception.
+     */
+    public TransactionStrategyException()
+    {
+    }
+
+    /**
+     * Construct a transaction strategy exception with the specified message.
+     * @param message The exception message.
+     */
+    public TransactionStrategyException(final String message) 
+    {
+        super(message);
+    }
+
+    /**
+     * Construct a transaction strategy exception with the associated cause.
+     * @param cause The associated cause.
+     */
+    public TransactionStrategyException(final Throwable cause)
+    {
+        super(cause);
+    }
+
+    /**
+     * Construct a transaction strategy exception with the specified message and associated cause.
+     * @param message The exception message.
+     * @param cause The associated cause.
+     */
+    public TransactionStrategyException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}
+ 
\ No newline at end of file


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategyException.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -56,6 +56,7 @@
 	
 	/** Listeners */
 	public static final String LISTENER_CLASS_TAG            = "listenerClass";
+        public static final String TRANSACTED_TAG            = "transacted";
     
 	/** Deployment */
 	public static final String DEPLOYMENT_NAME_TAG 			= "deployment";

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ScheduleListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ScheduleListener.java	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ScheduleListener.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -24,6 +24,8 @@
 import org.jboss.soa.esb.listeners.message.ActionProcessingPipeline;
 import org.jboss.soa.esb.schedule.ScheduledEventListener;
 import org.jboss.soa.esb.schedule.SchedulingException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.Initializable;
@@ -49,6 +51,10 @@
      * Event message processor. 
      */
     private Initializable eventProcessor;
+    /**
+     * The transaction strategy.
+     */
+    private final TransactionStrategy transactionStrategy ;
 
     /**
      * Construct the managed lifecycle.
@@ -74,6 +80,9 @@
             throw new ConfigurationException("Failed to instantiate Event Processor class [" + eventProcessorClass + "].", e);
         }
 
+        final boolean transacted = config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+        transactionStrategy = TransactionStrategy.getTransactionStrategy(transacted) ;
+        
         if(!(eventProcessor instanceof ScheduledEventListener) && !(eventProcessor instanceof ScheduledEventMessageComposer)) {
             throwBadImplException(eventProcessorClass);
         }
@@ -100,16 +109,30 @@
     public void onSchedule() throws SchedulingException {
         Message message;
 
-        if(eventProcessor instanceof ScheduledEventMessageComposer) {
-            ScheduledEventMessageComposer composer = (ScheduledEventMessageComposer)eventProcessor;
-            message = composer.composeMessage();
-            if(message != null)
-            {
-	            pipeline.process(message);
-	            composer.onProcessingComplete(message);
+        try {
+            transactionStrategy.begin() ;
+            boolean rollbackOnly = true ;
+            try {
+                if(eventProcessor instanceof ScheduledEventMessageComposer) {
+                    ScheduledEventMessageComposer composer = (ScheduledEventMessageComposer)eventProcessor;
+                    message = composer.composeMessage();
+                    if(message != null)
+                    {
+        	            pipeline.process(message);
+        	            composer.onProcessingComplete(message);
+                    }
+                } else {
+                    ((ScheduledEventListener)eventProcessor).onSchedule();
+                }
+                rollbackOnly = false ;
+            } finally {
+                if (rollbackOnly) {
+                    transactionStrategy.rollbackOnly() ;
+                }
+                transactionStrategy.terminate() ;
             }
-        } else {
-            ((ScheduledEventListener)eventProcessor).onSchedule();
+        } catch (final TransactionStrategyException tse) {
+            throw new SchedulingException("Unexpected transaction strategy exception", tse) ;
         }
     }
 

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/ScheduledListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/ScheduledListenerMapper.java	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/ScheduledListenerMapper.java	2007-10-31 13:05:17 UTC (rev 16198)
@@ -35,6 +35,9 @@
 
         listenerNode.setAttribute("event-processor", scheduledListener.getEventProcessor());
         listenerNode.setAttribute(ListenerTagNames.LISTENER_CLASS_TAG, ScheduleListener.class.getName());
+        if (scheduledListener.isSetTransacted()) {
+            listenerNode.setAttribute(ListenerTagNames.TRANSACTED_TAG, Boolean.toString(scheduledListener.getTransacted()));
+        }
 
         // Map the <property> elements targeted at the listener - from the listener itself.
         MapperUtil.mapProperties(scheduledListener.getPropertyList(), listenerNode);

Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml	2007-10-31 11:28:45 UTC (rev 16197)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml	2007-10-31 13:05:17 UTC (rev 16198)
@@ -5,13 +5,15 @@
      -->
      
      <providers>
-          <jms-provider name="Default-JMS-Provider" connection-factory="ConnectionFactory">          
+          <jms-jca-provider name="Default-JMS-JCA-Provider" connection-factory="ConnectionFactory">          
               <jms-bus busid="DeadMessageQueue">
                   <jms-message-filter
                       dest-type="QUEUE"
                       dest-name="queue/DeadMessageQueue"
                   />
               </jms-bus>
+          </jms-jca-provider>
+          <jms-provider name="Default-JMS-Provider" connection-factory="ConnectionFactory">          
 			 <jms-bus busid="DataCollectorQueue">
                   <jms-message-filter
                       dest-type="QUEUE"
@@ -111,7 +113,8 @@
         </service>
         <service category="JBossESB-Internal" name="RedeliverService" description="Scheduled Service to Redeliver Messages">
             <listeners>
-                <scheduled-listener name="redeliver-scheduled-listener" scheduleidref="5-min-trigger" event-processor="org.jboss.soa.esb.schedule.RedeliverEventMessageComposer" />
+                <scheduled-listener name="redeliver-scheduled-listener" scheduleidref="5-min-trigger"
+                    event-processor="org.jboss.soa.esb.schedule.RedeliverEventMessageComposer" transacted="true"/>
             </listeners>
             <actions mep="OneWay">
                 <action name="RedeliverMessagesAction" class="org.jboss.soa.esb.actions.MessageRedeliverer">




More information about the jboss-svn-commits mailing list