[Jboss-cvs] JBossAS SVN: r56851 - in branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms: . inflow inflow/dlq

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 14 11:21:24 EDT 2006


Author: bill.burke at jboss.com
Date: 2006-09-14 11:21:22 -0400 (Thu, 14 Sep 2006)
New Revision: 56851

Added:
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java
Modified:
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
   branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java
Log:
backmerge

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -108,12 +108,6 @@
       if (trace)
          log.trace("created new managed connection: " + mc);
 
-      // Set default logwriter according to spec
-
-      // 
-      // jason: screw the logWriter stuff for now it sucks ass
-      //
-
       return mc;
    }
 

Copied: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java (from rev 56850, branches/Branch_4_0/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java)

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -42,10 +42,12 @@
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
 
 import org.jboss.jms.jndi.JMSProviderAdapter;
 import org.jboss.logging.Logger;
 import org.jboss.resource.adapter.jms.JmsResourceAdapter;
+import org.jboss.tm.TransactionManagerLocator;
 import org.jboss.util.Strings;
 import org.jboss.util.naming.Util;
 
@@ -95,6 +97,10 @@
    /** The DLQ handler */
    protected DLQHandler dlqHandler;
    
+   /** The TransactionManager */
+   protected TransactionManager tm;
+   
+   
    static
    {
       try
@@ -153,6 +159,17 @@
    {
       return ra.getWorkManager();
    }
+   
+   public TransactionManager getTransactionManager()
+   {
+       if(tm == null)
+       {
+          tm = TransactionManagerLocator.getInstance().locate();
+          
+       }
+       
+       return tm;
+   }
 
    /**
     * @return the connection
@@ -212,6 +229,7 @@
    public void handleFailure(Throwable failure)
    {
       log.warn("Failure in jms activation " + spec, failure);
+      boolean reconnected = false;
       
       while (deliveryActive.get())
       {
@@ -230,11 +248,18 @@
          try
          {
             setup();
+            reconnected = true;
          }
          catch (Throwable t)
          {
             log.error("Unable to reconnect " + spec, t);
          }
+         
+         if(reconnected)
+         {
+            log.info("Reconnected to JMS provider " + spec);
+            break;
+         }
       }
    }
 

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -91,7 +91,7 @@
    /** The keep alive time for sessions */
    private long keepAlive = 60000;
 
-   /** Is the ession transacted */
+   /** Is the session transacted */
    private boolean sessionTransacted = true;
 
    /** The DLQ handler class */

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -36,6 +36,9 @@
 import javax.resource.spi.work.WorkException;
 import javax.resource.spi.work.WorkListener;
 import javax.resource.spi.work.WorkManager;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
 import org.jboss.logging.Logger;
@@ -44,6 +47,7 @@
  * A generic jms session pool.
  * 
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a> 
  * @version $Revision$
  */
 public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
@@ -71,7 +75,10 @@
    
    /** Any DLQ handler */
    DLQHandler dlqHandler;
-
+   
+   /** The runtimeHandler */
+   RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
+   
    /**
     * Create a new JmsServerSession
     * 
@@ -80,6 +87,7 @@
    public JmsServerSession(JmsServerSessionPool pool)
    {
       this.pool = pool;
+      
    }
    
    /**
@@ -156,9 +164,19 @@
    
    public void onMessage(Message message)
    {
+      TransactionDemarcationStrategy td = null;
+      
+      if (JmsServerSessionPool.USE_OLD)
+      {
+         td = createTransactionDemarcation();
+         if (td == null)
+            return;
+      }
+      
       try
       {
          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
+    
          try
          {
             if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
@@ -171,10 +189,23 @@
                dlqHandler.messageDelivered(message);
          }
       }
+
       catch (Throwable t)
       {
          log.error("Unexpected error delivering message " + message, t);
+         
+         if (td != null)
+               td.error();
+         
+         runtimeHandler.handleRuntimeError(t);
+         
       }
+      finally
+      {
+         if (td != null)
+            td.end();
+      }
+   
    }
 
    public Session getSession() throws JMSException
@@ -199,9 +230,43 @@
 
    public void run()
    {
-      session.run();
+      TransactionDemarcationStrategy td = null;
+      
+      if (JmsServerSessionPool.USE_OLD == false)
+      {
+         td = createTransactionDemarcation();
+         
+         if (td == null)
+            return;
+      }
+      
+      try
+      {         
+         if (JmsServerSessionPool.USE_OLD && xaSession != null)
+            xaSession.run();
+         
+         else
+            session.run();
+
+      }
+      catch(Throwable t)
+      {
+         if (td != null)
+            td.error();
+         
+      }finally
+      {
+         if (td != null)
+            td.end();
+      }
+      
    }
-
+   
+   private TransactionDemarcationStrategy createTransactionDemarcation()
+   {
+      return new DemarcationStrategyFactory().getStrategy();
+      
+   }
    public void release()
    {
    }
@@ -224,4 +289,202 @@
    public void workStarted(WorkEvent e)
    {
    }
+   
+   private class DemarcationStrategyFactory
+   {
+      
+      public DemarcationStrategyFactory()
+      {
+      }
+
+      TransactionDemarcationStrategy getStrategy()
+      {
+         
+         if(pool.getActivation().isDeliveryTransacted())
+         {
+            try
+            {
+               return new XATransactionDemarcationStrategy();
+            }
+            catch (Throwable t)
+            {
+               log.error(this + " error creating transaction demarcation ", t);
+               return null;
+            }         
+          
+         }else
+         {
+            return new LocalDemarcationStrategy();
+            
+         }
+         
+      }
+   
+   }
+   private interface TransactionDemarcationStrategy
+   {
+      void error();
+      void end();
+      
+   }
+   
+   private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      public void end()
+      {
+         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         {
+            if(session != null)
+            {
+               try
+               {
+                  session.commit();
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to commit session transaction", e);
+               }
+            }
+         }
+      }
+      
+      public void error()
+      {
+         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         {
+            if(session != null)
+               
+               try
+               {
+                  session.rollback();
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to rollback session transaction", e);
+               }
+            
+         }
+      }
+   
+   }
+
+   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+	 
+	  boolean trace = log.isTraceEnabled();
+	  
+      Transaction trans = null;
+      TransactionManager tm = pool.getActivation().getTransactionManager();;
+      
+      public XATransactionDemarcationStrategy() throws Throwable
+      {
+            
+            tm.begin();
+
+            try
+            {
+               trans = tm.getTransaction();
+
+               if (trace)
+                  log.trace(JmsServerSession.this + " using tx=" + trans);
+
+               if (xaSession != null)
+               {
+                  XAResource res = xaSession.getXAResource();
+
+                  if (!trans.enlistResource(res))
+                  {
+                     throw new JMSException("could not enlist resource");
+                  }
+                  if (trace)
+                     log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
+               }
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  tm.rollback();
+               }
+               catch (Throwable ignored)
+               {
+                  log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
+               }
+               throw t;
+            }
+
+       }
+         
+      
+      public void error()
+      {
+         // Mark for tollback TX via TM
+         try
+         {
+
+            if (trace)
+               log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+            trans.setRollbackOnly();
+         }
+         catch (Throwable t)
+         {
+            log.error(JmsServerSession.this + " failed to set rollback only", t);
+         }
+
+      }
+      
+      public void end()
+      {
+         try
+         {
+
+            // Use the TM to commit the Tx (assert the correct association) 
+            Transaction currentTx = tm.getTransaction();
+            if (trans.equals(currentTx) == false)
+               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+            // Marked rollback
+            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+            {
+               if (trace)
+                  log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
+               // actually roll it back
+               tm.rollback();
+
+               // NO XASession? then manually rollback.
+               // This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+            }
+
+            else if (trans.getStatus() == Status.STATUS_ACTIVE)
+            {
+               // Commit tx
+               // This will happen if
+               // a) everything goes well
+               // b) app. exception was thrown
+               if (trace)
+                  log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
+               tm.commit();
+
+               // NO XASession? then manually commit.  This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.commit();
+               }
+            }
+
+         }
+         catch (Throwable t)
+         {
+            log.error(JmsServerSession.this + " failed to commit/rollback", t);
+         }
+
+      }
+
+   }
 }
\ No newline at end of file

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -21,6 +21,8 @@
 */
 package org.jboss.resource.adapter.jms.inflow;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 
 import javax.jms.Connection;
@@ -43,7 +45,20 @@
 {
    /** The logger */
    private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
-
+   
+   public static final boolean USE_OLD;
+   
+   static
+   {
+      USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
+      {
+         public Object run()
+         {
+            return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
+         }
+      })).booleanValue();
+   }
+   
    /** The activation */
    JmsActivation activation;
 
@@ -273,4 +288,5 @@
          log.debug("Error closing the consumer " + consumer, t);
       }
    }
+
 }
\ No newline at end of file

Copied: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java (from rev 56850, branches/Branch_4_0/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java)

Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java	2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java	2006-09-14 15:21:22 UTC (rev 56851)
@@ -34,6 +34,7 @@
 import javax.jms.QueueConnectionFactory;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
+import javax.jms.TopicConnectionFactory;
 import javax.naming.Context;
 
 import org.jboss.jms.jndi.JMSProviderAdapter;




More information about the jboss-cvs-commits mailing list