[jboss-svn-commits] JBL Code SVN: r20946 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/couriers and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jul 7 14:06:37 EDT 2008


Author: mark.little at jboss.com
Date: 2008-07-07 14:06:37 -0400 (Mon, 07 Jul 2008)
New Revision: 20946

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
Modified:
   labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1740

Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-07-07 17:22:31 UTC (rev 20945)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-07-07 18:06:37 UTC (rev 20946)
@@ -23,7 +23,10 @@
 package org.jboss.internal.soa.esb.couriers;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.tx.InVMXAResource;
 import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.message.Message;
 
@@ -89,13 +92,18 @@
     }
 
     /**
-     * package the ESB message into the queue
+     * package the ESB message into the queue. If this is a transactional interaction
+     * then the deliver will return immediately, but the message may not go into
+     * the queue if the transaction subsequently rolls back. The caller must monitor
+     * the transaction outcome to determine the fate of the message. For example, register
+     * a Synchronization.
      *
-     * @param message Message - the message to deliverAsync
+     * @param  Message - the message to deliverAsync
      * @return boolean - the result of the delivery
      * @throws CourierException -
      *                          if problems were encountered
      */
+    
     public boolean deliver(Message message) {
         if (!isCourierActive()) {
             return false;
@@ -105,44 +113,106 @@
             return false;
         }
 
-        synchronized (messageQueue) {
+        try
+        {
             /*
-             * If not pass-by-reference then use a copy of
-             * the input message.
+             * Are we operating within the scope of a global transaction?
              */
             
-            if (!passByReference)
+            if (isTransactional())
             {
-                try
-                {
-                    messageQueue.add(message.copy());
-                }
-                catch (IOException ex)
-                {
-                    logger.warn("Could not create a copy of message to pass by value: "+ex);
-                    
-                    return false;
-                }
+                /*
+                 * If we are transactional, then hold off on the insertion until the transaction commits.
+                 * The XAResource is given the duty of doing the insert in that case.
+                 * If it doesn't commit, then the insert won't happen and the client needs to monitor
+                 * the transaction outcome.
+                 * 
+                 * We could get away with using a Synchronization rather than an XAResource
+                 * since there's no durable component in this case. However, Synchronizations aren't
+                 * guaranteed to be called in all circumstances and unless you've read the JTS spec. it'll
+                 * make maintaining things that little bit more difficult. Using a Synchronization would
+                 * be ok with JBossTS, but not necessarily other implementations. We could end up with
+                 * a non-atomic outcome as a result.
+                 * 
+                 * The downside of this is that we could force a 1PC transaction to become 2PC when in
+                 * fact there's no real difference to the 1PC participant. But the upside is we get
+                 * consistency!
+                 */
+                
+                TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+                
+                txStrategy.enlistResource(new InVMXAResource(this, message, InVMXAResource.operation.INSERT));
+                
+                /*
+                 * Can't do lockstep wait here because otherwise the transaction may not terminate if this
+                 * is the transaction controller thread!
+                 */
+                
+                return true;  // does not mean message got into the queue.
             }
             else
-                messageQueue.add(message);
+            {
+                synchronized (messageQueue) {
+                    /*
+                     * If not pass-by-reference then use a copy of
+                     * the input message.
+                     */
+                    
+                    if (!passByReference)
+                    {
+                        try
+                        {
+                            messageQueue.add(message.copy());
+                        }
+                        catch (IOException ex)
+                        {
+                            logger.warn("Could not create a copy of message to pass by value: "+ex);
+                            
+                            return false;
+                        }
+                    }
+                    else
+                        messageQueue.add(message);
 
-            // Notify 1 waiting pickup thread of the delivery...
-            messageQueue.notify();
+                    // Notify 1 waiting pickup thread of the delivery...
+                    messageQueue.notify();
 
-            if (deliveryTimeout > 0) {
-                try {
-                    // Wait on notification from the pickup thread...
-                    messageQueue.wait(deliveryTimeout);
-                } catch (InterruptedException e) {
-                    logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+                    if (deliveryTimeout > 0) {
+                        try {
+                            // Wait on notification from the pickup thread...
+                            messageQueue.wait(deliveryTimeout);
+                        } catch (InterruptedException e) {
+                            logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+                        }
+                    }
                 }
+                
+                return true;
             }
         }
-
-        return true;
+        catch (final Throwable ex)
+        {
+            logger.warn("InVMCourier delivery caught: "+ex);
+            
+            return false;
+        }
     }
 
+    /**
+     * Get a mesage from the queue or wait for the specified time in case one
+     * arrives.
+     * 
+     * If this is a transactional interaction then the message will be placed back
+     * on the queue if the enclosing transaction rolls back. Note that for performance
+     * reasons it is not guaranteed that the message will go back at the same relative
+     * position.
+     * 
+     * @param long the time to wait if the queue is empty.
+     * @return a Message or <code>null</code> if there is nothing on the queue.
+     */
+    
+    // see associated test
+    
     public Message pickup(long millis) {
         if (!isCourierActive()) {
             return null;
@@ -150,22 +220,49 @@
 
         Message message = null;
 
-        millis = Math.max(millis, 100);
-        synchronized (messageQueue) {
-            if (messageQueue.isEmpty()) {
-                try {
-                    messageQueue.wait(millis);
-                } catch (InterruptedException e) {
-                    logger.debug("Pickup thread '" + Thread.currentThread().getName() + "' interrupted while waiting on delivery notification or timeout.", e);
+        try
+        {
+            millis = Math.max(millis, 100);
+            synchronized (messageQueue) {
+                if (messageQueue.isEmpty()) {
+                    try {
+                        messageQueue.wait(millis);
+                    } catch (InterruptedException e) {
+                        logger.debug("Pickup thread '" + Thread.currentThread().getName() + "' interrupted while waiting on delivery notification or timeout.", e);
+                    }
                 }
+                if (!messageQueue.isEmpty()) {
+                    message = messageQueue.remove();
+                }
+
+                // Notify 1 waiting delivery thread of the pickup...
+                messageQueue.notify();
             }
-            if (!messageQueue.isEmpty()) {
-                message = messageQueue.remove();
+
+            if (isTransactional())
+            {
+                /*
+                 * Return the message, but don't remove it from the queue until the transaction
+                 * commits. If the transaction rolls back then the message may not go back into the
+                 * queue at the exact place it was originally: other messages may have been removed
+                 * successfully by other threads. Plus, we would have to maintain a before and after
+                 * image of the queue. This is more a compensation transaction.
+                 */
+                
+                TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+                InVMXAResource theResource = new InVMXAResource(this, message, InVMXAResource.operation.REMOVE);
+                           
+                txStrategy.enlistResource(theResource);
+                
+                // now we can deliver the message.
             }
-
-            // Notify 1 waiting delivery thread of the pickup...
-            messageQueue.notify();
         }
+        catch (final Throwable ex)
+        {
+            logger.warn("InVMCourier pickup caught: "+ex);
+            
+            return null;
+        }
 
         return message;
     }
@@ -189,6 +286,109 @@
     }
 
     public void reset() {
-        messageQueue.clear();
+        try
+        {
+            if (isTransactional())
+                logger.warn("InVMCourier reset called on transactional courier: ignoring reset for the sake of consistency!");
+            else
+                messageQueue.clear();
+        }
+        catch (final Throwable ex)
+        {
+            logger.debug("InVMCourier reset caught: "+ex);
+        }
     }
+    
+    /**
+     * Used by transactional resource to deliver the message to the queue
+     * in the event of a transaction commit.
+     * Almost the same as normal delivery, except no lockstep wait. Lockstep
+     * does not make sense in the scope of queued transactional delivery.
+     */
+    
+    public boolean doDeliver (Message message)
+    {
+        synchronized (messageQueue)
+        {
+            /*
+             * If not pass-by-reference then use a copy of
+             * the input message.
+             */
+            
+            if (!passByReference)
+            {
+                try
+                {
+                    messageQueue.add(message.copy());
+                }
+                catch (IOException ex)
+                {
+                    logger.warn("Could not create a copy of message to pass by value: "+ex);
+                    
+                    return false;
+                }
+            }
+            else
+                messageQueue.add(message);
+
+            // Notify 1 waiting pickup thread of the delivery...
+            messageQueue.notify();
+        }
+        
+        return true;
+    }
+    
+    /**
+     * Used by transactional resource to place message back on the queue. We remove from the head
+     * but insert to the tail, for speed. If individual messages need to be delivered in some well
+     * defined order then this could cause problems. However, there are no ordering guarantees with
+     * a range of transports, and particularly not in an asynchronous environment with arbitrary
+     * network latency.
+     * 
+     * If this does cause a problem then we should revisit. It will mean creating a fully transactional
+     * queue (done already in Arjuna so we may be able to lift that code). But that will affect
+     * performance. It may be easier to look at solving the ordering problem separately.
+     */
+    
+    public boolean doRedeliver (Message message)
+    {
+        synchronized (messageQueue) {
+                messageQueue.add(message);
+        }
+        
+        return true;
+    }
+    
+    /*
+     * TODO this is used in a number of classes so should be a separate
+     * util routine.
+     */
+    
+    private boolean isTransactional() throws CourierException {
+        boolean transactional;
+        try
+        {
+            TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+            Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+            boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+            transactional = (txHandle != null);
+
+            /*
+            * Make sure the current transaction is still active! If we
+            * have previously slept, then the timeout may be longer than that
+            * associated with the transaction.
+            */
+
+            if (transactional && !isActive)
+            {
+                throw new CourierException("Associated transaction is no longer active!");
+            }
+        }
+        catch (TransactionStrategyException ex)
+        {
+            throw new CourierException(ex);
+        }
+        return transactional;
+    }
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-07-07 17:22:31 UTC (rev 20945)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-07-07 18:06:37 UTC (rev 20946)
@@ -126,9 +126,9 @@
 
         boolean transactional = isTransactional();
 
-        Serializable serilaizedMessage;
+        Serializable serializedMessage;
         try {
-            serilaizedMessage = Util.serialize(message);
+            serializedMessage = Util.serialize(message);
         } catch (Exception e) {
             throw new CourierTransportException("Unable to serialize ESB Message.", e);
         }
@@ -139,7 +139,7 @@
             PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
             try {
                 insertStatement.setString(1, msgId);
-                insertStatement.setObject(2, serilaizedMessage);
+                insertStatement.setObject(2, serializedMessage);
                 insertStatement.setString(3, State.Pending.getColumnValue());
                 insertStatement.setLong(4, System.currentTimeMillis());
 

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java	2008-07-07 18:06:37 UTC (rev 20946)
@@ -0,0 +1,197 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.couriers.tx;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.InVMCourier;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * This XAResource instance controls the InVM queue manipulation under the control
+ * of the transaction manager. Since InVM is inherently volatile, a crash failure
+ * will take the entire queue with it, along with the client and server, we have much
+ * less to worry about here in terms of recovery. Therefore we need to ensure that
+ * the transaction manager does not save this instance to the log, since it won't
+ * make sense upon recovery: the entire queue will have been destroyed.
+ * 
+ * This does mean that we could have non-atomic outcomes in the event of a crash where
+ * durable participants (e.g., database) were involved in the same transaction. But
+ * there is nothing we can do about that, without affecting the performance benefits of
+ * InVM. We will call this out explicitly in the documentation and the user needs to
+ * be aware of the possible consequences.
+ * 
+ * Could be a Synchronization if we could guarantee that afterCompletion will be called
+ * (where we would do the compensation). But since that's only a 'best effort' we need
+ * to make this an XAResource.
+ */
+
+public class InVMXAResource implements XAResource
+{
+    public enum operation { INSERT, REMOVE };
+    
+    public InVMXAResource (InVMCourier courier, Message msg, final operation op)
+    {
+        _theCourier = courier;
+        _msg = msg;
+        _opcode = op;
+    }
+    
+    /*
+     * During commit, we deliver the message on to the queue if we were sending a message.
+     * If we were receiving, then this is a no-op since we already changed the queue by
+     * removing the message.
+     *
+     * @see javax.transaction.xa.XAResource#commit(javax.transaction.xa.Xid, boolean)
+     */
+    
+    public void commit (Xid xid, boolean onePhase) throws XAException
+    {
+        if (_opcode == operation.INSERT)
+        {
+            boolean problem = false;
+            
+            try
+            {
+                problem = _theCourier.doDeliver(_msg);
+            }
+            catch (final Exception ex)
+            {
+                ex.printStackTrace();
+                
+                problem = true;  // oops!
+            }
+            
+            if (problem)  // not a lot we can do at this stage
+            {
+                _logger.warn("InVMXAResource failed to commit to the InVM queue!");
+                
+                throw new XAException(XAException.XA_HEURHAZ);
+            }
+        }
+    }
+
+    public void end (Xid xid, int flags) throws XAException
+    {
+    }
+
+    public void forget (Xid xid) throws XAException
+    {
+    }
+    
+    public int getTransactionTimeout () throws XAException
+    {
+        return _timeout;
+    }
+    
+    public int prepare (Xid xid) throws XAException
+    {
+        return XAResource.XA_OK;
+    }
+
+    /*
+     * There is nothing to recover.
+     * 
+     * @see javax.transaction.xa.XAResource#recover(int)
+     */
+    
+    public Xid[] recover (int flag) throws XAException
+    {
+        return null;
+    }
+
+    /*
+     * During rollback we put the message back on the queue (tail) if we were receiving
+     * a message. If we were delivering then there is nothing to do because we did not
+     * update the queue directly, but rely on this instance to do it if the transaction
+     * commits.
+     */
+    
+    public void rollback (Xid xid) throws XAException
+    {
+        if (_opcode == operation.REMOVE) // put the message back on the queue
+        {
+            /*
+             * The message goes back on the queue. This may not be the same
+             * location as it had previously, but any attempt to do a truly
+             * transactional queue will affect performance adversely, for relatively
+             * little benefit. In an asynchronous world, applications should not
+             * be written assuming that a queue (or any transport) guarantees ordering.
+             * The ordering (or lack thereof) should be dealt with at the application
+             * level, where possible.
+             * 
+             * TODO we could add a queue-insertion policy that allows the developer to override
+             * how the message gets placed back into the queue.
+             */
+            
+            boolean problem = false;
+            
+            try
+            {
+                problem = _theCourier.doRedeliver(_msg);
+            }
+            catch (final Exception ex)
+            {
+                problem = true;
+            }
+            
+            if (problem)  // shouldn't get here, but ...
+            {
+                _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+                
+                throw new XAException(XAException.XA_HEURHAZ);
+            }
+        }
+    }
+
+    public boolean setTransactionTimeout (int seconds) throws XAException
+    {
+        _timeout = seconds;
+        
+        return true;
+    }
+
+    public void start (Xid xid, int flags) throws XAException
+    {
+    }
+
+    public Message getMessage ()
+    {
+        return _msg;
+    }
+    
+    public boolean isSameRM (XAResource xares) throws XAException
+   {
+       return (xares == this);
+   }
+
+    private InVMCourier _theCourier;
+    private Message _msg;
+    private operation _opcode;
+    private int _timeout;
+    
+    protected static Logger _logger = Logger.getLogger(InVMXAResource.class);
+}
\ No newline at end of file




More information about the jboss-svn-commits mailing list