[jboss-cvs] JBossAS SVN: r67638 - trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 29 19:57:14 EST 2007


Author: bdecoste
Date: 2007-11-29 19:57:13 -0500 (Thu, 29 Nov 2007)
New Revision: 67638

Modified:
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
Log:
[aspatch-240] merge tibco ems ha session handling fixes

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-11-30 00:57:13 UTC (rev 67638)
@@ -120,6 +120,46 @@
    
    private boolean redeliverUnspecified = true;
    
+   private int transactionTimeout;
+   
+   private Boolean isSameRMOverrideValue;
+   
+   private boolean forceClearOnShutdown = false;
+   
+   private long forceClearOnShutdownInterval = 1000;
+   
+   private int forceClearAttempts = 0;
+   
+   public void setForceClearOnShutdown(boolean forceClear)
+   {
+      this.forceClearOnShutdown = forceClear;
+   }   
+   
+   public boolean isForceClearOnShutdown()
+   {
+      return this.forceClearOnShutdown;
+   }
+   
+   public long getForceClearOnShutdownInterval()
+   {
+      return this.forceClearOnShutdownInterval;
+   }
+   
+   public void setForceClearOnShutdownInterval(long forceClearOnShutdownInterval)
+   {
+      this.forceClearOnShutdownInterval = forceClearOnShutdownInterval;
+   }
+   
+   public int getForceClearAttempts()
+   {
+      return forceClearAttempts;
+   }
+   
+   public void setForceClearAttempts(int forceClearAttempts)
+   {
+      this.forceClearAttempts = forceClearAttempts;
+   }
+
    /**
     * @return the acknowledgeMode.
     */
@@ -670,4 +710,25 @@
    {
       this.redeliverUnspecified = redeliverUnspecified;
    }
+   
+   public int getTransactionTimeout()
+   {
+      return transactionTimeout;
+   }
+
+   public void setTransactionTimeout(int transactionTimeout)
+   {
+      this.transactionTimeout = transactionTimeout;
+   }
+   
+   public Boolean getIsSameRMOverrideValue()
+   {
+      return isSameRMOverrideValue;
+   }
+
+   public void setIsSameRMOverrideValue(Boolean isSameRMOverrideValue)
+   {
+      this.isSameRMOverrideValue = isSameRMOverrideValue;
+   }
+
 }
\ No newline at end of file

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-11-30 00:57:13 UTC (rev 67638)
@@ -1,24 +1,24 @@
 /*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.resource.adapter.jms.inflow;
 
 import javax.jms.Connection;
@@ -47,49 +47,50 @@
  * A generic jms session pool.
  * 
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="mailto:weston.price at jboss.com>Weston Price</a> 
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
  * @version $Revision$
  */
-public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
+public class JmsServerSession implements ServerSession, MessageListener, Work,
+      WorkListener
 {
    /** The log */
    private static final Logger log = Logger.getLogger(JmsServerSession.class);
-   
+
    /** The session pool */
    JmsServerSessionPool pool;
-   
+
    /** The transacted flag */
    boolean transacted;
-   
+
    /** The acknowledge mode */
    int acknowledge;
-   
+
    /** The session */
    Session session;
-   
+
    /** Any XA session */
    XASession xaSession;
-   
+
    /** The endpoint */
    MessageEndpoint endpoint;
-   
+
    /** Any DLQ handler */
    DLQHandler dlqHandler;
-      
+
    TransactionDemarcationStrategy txnStrategy;
-   
-   
+
    /**
     * Create a new JmsServerSession
     * 
-    * @param pool the server session pool
+    * @param pool
+    *           the server session pool
     */
    public JmsServerSession(JmsServerSessionPool pool)
    {
       this.pool = pool;
-      
+
    }
-   
+
    /**
     * Setup the session
     */
@@ -99,35 +100,36 @@
       JmsActivationSpec spec = activation.getActivationSpec();
 
       dlqHandler = activation.getDLQHandler();
-      
+
       Connection connection = activation.getConnection();
 
       // Create the session
-      if (connection instanceof XAConnection && activation.isDeliveryTransacted())
+      if (connection instanceof XAConnection
+            && activation.isDeliveryTransacted())
       {
          xaSession = ((XAConnection) connection).createXASession();
          session = xaSession.getSession();
-      }
-      else
-      {         
-         transacted = spec.isSessionTransacted();         
-         acknowledge = spec.getAcknowledgeModeInt();         
+      } else
+      {
+         transacted = spec.isSessionTransacted();
+         acknowledge = spec.getAcknowledgeModeInt();
          session = connection.createSession(transacted, acknowledge);
       }
-      
+
       // Get the endpoint
-      MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+      MessageEndpointFactory endpointFactory = activation
+            .getMessageEndpointFactory();
       XAResource xaResource = null;
 
       if (activation.isDeliveryTransacted() && xaSession != null)
          xaResource = xaSession.getXAResource();
-      
+
       endpoint = endpointFactory.createEndpoint(xaResource);
-      
+
       // Set the message listener
       session.setMessageListener(this);
    }
-   
+
    /**
     * Stop the session
     */
@@ -137,8 +139,7 @@
       {
          if (endpoint != null)
             endpoint.release();
-      }
-      catch (Throwable t)
+      } catch (Throwable t)
       {
          log.debug("Error releasing endpoint " + endpoint, t);
       }
@@ -147,8 +148,7 @@
       {
          if (xaSession != null)
             xaSession.close();
-      }
-      catch (Throwable t)
+      } catch (Throwable t)
       {
          log.debug("Error releasing xaSession " + xaSession, t);
       }
@@ -157,31 +157,30 @@
       {
          if (session != null)
             session.close();
-      }
-      catch (Throwable t)
+      } catch (Throwable t)
       {
          log.debug("Error releasing session " + session, t);
       }
    }
-   
+
    public void onMessage(Message message)
    {
       try
       {
          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
-    
+
          try
          {
-            if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
+            if (dlqHandler == null
+                  || dlqHandler.handleRedeliveredMessage(message) == false)
             {
-               MessageListener listener = (MessageListener)endpoint;
+               MessageListener listener = (MessageListener) endpoint;
                listener.onMessage(message);
             }
-         }
-         finally
+         } finally
          {
             endpoint.afterDelivery();
-            
+
             if (dlqHandler != null)
                dlqHandler.messageDelivered(message);
          }
@@ -190,13 +189,12 @@
       catch (Throwable t)
       {
          log.error("Unexpected error delivering message " + message, t);
-         
-         if(txnStrategy != null)
+
+         if (txnStrategy != null)
             txnStrategy.error();
-         
+
       }
-      
-   
+
    }
 
    public Session getSession() throws JMSException
@@ -206,13 +204,12 @@
 
    public void start() throws JMSException
    {
-      JmsActivation activation = pool.getActivation(); 
+      JmsActivation activation = pool.getActivation();
       WorkManager workManager = activation.getWorkManager();
       try
       {
          workManager.scheduleWork(this, 0, null, this);
-      }
-      catch (WorkException e)
+      } catch (WorkException e)
       {
          log.error("Unable to schedule work", e);
          throw new JMSException("Unable to schedule work: " + e.toString());
@@ -221,46 +218,45 @@
 
    public void run()
    {
-      
+
       try
       {
          txnStrategy = createTransactionDemarcation();
-         
-      }catch(Throwable t)
+
+      } catch (Throwable t)
       {
          log.error("Error creating transaction demarcation. Cannot continue.");
          return;
       }
-      
-      
+
       try
-      {         
+      {
          session.run();
-      }      
-      catch(Throwable t)
+      } catch (Throwable t)
       {
          if (txnStrategy != null)
             txnStrategy.error();
-         
-      }finally
+
+      } finally
       {
-         if(txnStrategy != null)
+         if (txnStrategy != null)
             txnStrategy.end();
 
          txnStrategy = null;
       }
-      
+
    }
-   
+
    private TransactionDemarcationStrategy createTransactionDemarcation()
    {
       return new DemarcationStrategyFactory().getStrategy();
-      
+
    }
+
    public void release()
    {
    }
-   
+
    public void workAccepted(WorkEvent e)
    {
    }
@@ -275,154 +271,170 @@
       pool.returnServerSession(this);
    }
 
-
    public void workStarted(WorkEvent e)
    {
    }
-   
+
    private class DemarcationStrategyFactory
    {
-      
+
       TransactionDemarcationStrategy getStrategy()
       {
          TransactionDemarcationStrategy current = null;
-         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JmsActivationSpec spec = pool.getActivation()
+               .getActivationSpec();
          final JmsActivation activation = pool.getActivation();
-         
-         if(activation.isDeliveryTransacted() && xaSession != null)
+
+         if (activation.isDeliveryTransacted() && xaSession != null)
          {
             try
             {
                current = new XATransactionDemarcationStrategy();
-            }    
-            catch (Throwable t)
+            } catch (Throwable t)
             {
                log.error(this + " error creating transaction demarcation ", t);
-            }         
-          
-         }else
+            }
+
+         } else
          {
-                        
-               return new LocalDemarcationStrategy();               
-            
+
+            return new LocalDemarcationStrategy();
+
          }
-         
+
          return current;
       }
-   
+
    }
+
    private interface TransactionDemarcationStrategy
    {
       void error();
+
       void end();
-      
+
    }
-   
-   private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+
+   private class LocalDemarcationStrategy implements
+         TransactionDemarcationStrategy
    {
       public void end()
       {
-         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-         
-         if(spec.isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation()
+               .getActivationSpec();
+
+         if (spec.isSessionTransacted())
          {
-            if(session != null)
+            if (session != null)
             {
                try
                {
                   session.commit();
-               }
-               catch (JMSException e)
+               } catch (JMSException e)
                {
                   log.error("Failed to commit session transaction", e);
                }
             }
          }
       }
-      
+
       public void error()
       {
-         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-         
-         if(spec.isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation()
+               .getActivationSpec();
+
+         if (spec.isSessionTransacted())
          {
-            if(session != null)
-               
+            if (session != null)
+
                try
                {
                   /*
                    * Looks strange, but this basically means
                    * 
-                   * If the underlying connection was non-XA and the transaction attribute is REQUIRED
-                   * we rollback. Also, if the underlying connection was non-XA and the transaction
-                   * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled 
-                   * we rollback to force redelivery.
+                   * If the underlying connection was non-XA and the transaction
+                   * attribute is REQUIRED we rollback. Also, if the underlying
+                   * connection was non-XA and the transaction attribute is
+                   * NOT_SUPPORT and the non standard redelivery behavior is
+                   * enabled we rollback to force redelivery.
                    * 
                    */
-                  if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  if (pool.getActivation().isDeliveryTransacted()
+                        || spec.getRedeliverUnspecified())
                   {
-                     session.rollback();                     
+                     session.rollback();
                   }
-               
-               }
-               catch (JMSException e)
+
+               } catch (JMSException e)
                {
                   log.error("Failed to rollback session transaction", e);
                }
-            
+
          }
       }
-   
+
    }
 
-   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   private class XATransactionDemarcationStrategy implements
+         TransactionDemarcationStrategy
    {
-	 
-	  boolean trace = log.isTraceEnabled();
-	  
+
+      boolean trace = log.isTraceEnabled();
+
       Transaction trans = null;
+
       TransactionManager tm = pool.getActivation().getTransactionManager();;
-      
+
       public XATransactionDemarcationStrategy() throws Throwable
       {
-            
-            tm.begin();
+         final int timeout = pool.getActivation().getActivationSpec()
+               .getTransactionTimeout();
 
-            try
-            {
-               trans = tm.getTransaction();
+         if (timeout > 0)
+         {
+            log.trace("Setting transactionTimeout for JMSSessionPool to "
+                  + timeout);
+            tm.setTransactionTimeout(timeout);
 
-               if (trace)
-                  log.trace(JmsServerSession.this + " using tx=" + trans);
+         }
 
-               if (xaSession != null)
-               {
-                  XAResource res = xaSession.getXAResource();
+         tm.begin();
 
-                  if (!trans.enlistResource(res))
-                  {
-                     throw new JMSException("could not enlist resource");
-                  }
-                  if (trace)
-                     log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
-               }
-            }
-            catch (Throwable t)
+         try
+         {
+            trans = tm.getTransaction();
+
+            if (trace)
+               log.trace(JmsServerSession.this + " using tx=" + trans);
+
+            if (xaSession != null)
             {
-               try
+               XAResource res = xaSession.getXAResource();
+
+               if (!trans.enlistResource(res))
                {
-                  tm.rollback();
+                  throw new JMSException("could not enlist resource");
                }
-               catch (Throwable ignored)
-               {
-                  log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
-               }
-               throw t;
+               if (trace)
+                  log.trace(JmsServerSession.this + " XAResource '" + res
+                        + "' enlisted.");
             }
+         } catch (Throwable t)
+         {
+            try
+            {
+               tm.rollback();
+            } catch (Throwable ignored)
+            {
+               log.trace(JmsServerSession.this
+                     + " ignored error rolling back after failed enlist",
+                     ignored);
+            }
+            throw t;
+         }
 
-       }
-         
-      
+      }
+
       public void error()
       {
          // Mark for tollback TX via TM
@@ -430,38 +442,45 @@
          {
 
             if (trace)
-               log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+               log.trace(JmsServerSession.this
+                     + " using TM to mark TX for rollback tx=" + trans);
             trans.setRollbackOnly();
-         }
-         catch (Throwable t)
+         } catch (Throwable t)
          {
-            log.error(JmsServerSession.this + " failed to set rollback only", t);
+            log
+                  .error(
+                        JmsServerSession.this + " failed to set rollback only",
+                        t);
          }
 
       }
-      
+
       public void end()
       {
          try
          {
 
-            // Use the TM to commit the Tx (assert the correct association) 
+            // Use the TM to commit the Tx (assert the correct association)
             Transaction currentTx = tm.getTransaction();
             if (trans.equals(currentTx) == false)
-               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+               throw new IllegalStateException(
+                     "Wrong tx association: expected " + trans + " was "
+                           + currentTx);
 
             // Marked rollback
             if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
             {
                if (trace)
-                  log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
+                  log.trace(JmsServerSession.this
+                        + " rolling back JMS transaction tx=" + trans);
                // actually roll it back
                tm.rollback();
 
                // NO XASession? then manually rollback.
                // This is not so good but
                // it's the best we can do if we have no XASession.
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               if (xaSession == null
+                     && pool.getActivation().isDeliveryTransacted())
                {
                   session.rollback();
                }
@@ -474,30 +493,31 @@
                // a) everything goes well
                // b) app. exception was thrown
                if (trace)
-                  log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
+                  log.trace(JmsServerSession.this
+                        + " commiting the JMS transaction tx=" + trans);
                tm.commit();
 
-               // NO XASession? then manually commit.  This is not so good but
+               // NO XASession? then manually commit. This is not so good but
                // it's the best we can do if we have no XASession.
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               if (xaSession == null
+                     && pool.getActivation().isDeliveryTransacted())
                {
                   session.commit();
                }
-            
-            }else
+
+            } else
             {
                tm.suspend();
-               
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+
+               if (xaSession == null
+                     && pool.getActivation().isDeliveryTransacted())
                {
                   session.rollback();
                }
-               
-               
+
             }
 
-         }
-         catch (Throwable t)
+         } catch (Throwable t)
          {
             log.error(JmsServerSession.this + " failed to commit/rollback", t);
          }

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2007-11-30 00:57:13 UTC (rev 67638)
@@ -220,15 +220,41 @@
          sessionCount -= serverSessions.size();
          serverSessions.clear();
 
-         // Wait for inuse sessions
-         while (sessionCount > 0)
-         {
-            try
+         if (activation.getActivationSpec().isForceClearOnShutdown())
+         {        
+            int attempts = 0;
+            int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
+            long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
+            
+            log.info("Force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
+           
+            while((sessionCount > 0) && (attempts < forceClearAttempts))
             {
-               serverSessions.wait();
+               try
+               {
+                  serverSessions.wait(forceClearInterval);
+                  log.trace("Clear attempt " + attempts); 
+                  ++attempts;               
+            
+               }catch(InterruptedException ignore)
+               {
+                  
+               }
+            
             }
-            catch (InterruptedException ignore)
+         }
+         else
+         {
+            // Wait for inuse sessions
+            while (sessionCount > 0)
             {
+               try
+               {
+                  serverSessions.wait();
+               }
+               catch (InterruptedException ignore)
+               {
+               }
             }
          }
       }




More information about the jboss-cvs-commits mailing list