[jboss-cvs] JBossAS SVN: r60397 - branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 7 14:25:23 EST 2007


Author: weston.price at jboss.com
Date: 2007-02-07 14:25:22 -0500 (Wed, 07 Feb 2007)
New Revision: 60397

Modified:
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
Log:
[JBAS-1434][JBAS-3321] Removed reconnect attempts optimization. Added setTransactionTimeout
to activation spec and JmsServerSessionPool. 

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-02-07 19:25:22 UTC (rev 60397)
@@ -229,9 +229,8 @@
    public void handleFailure(Throwable failure)
    {
       log.warn("Failure in jms activation " + spec, failure);
-      int reconnectCount = 0;
       
-      while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
+      while (deliveryActive.get())
       {
          teardown();
          try
@@ -256,7 +255,6 @@
             log.error("Unable to reconnect " + spec, t);
          }
          
-         ++reconnectCount;
 
       }
    }

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-02-07 19:25:22 UTC (rev 60397)
@@ -115,12 +115,11 @@
    /** The DLQ max resent  */
    private int dLQMaxResent = 5;
    
-   //Default to 5 attempts
-   private int reconnectAttempts = 5;
-   
    //Used to specify whether or not we should attempt to redeliver a message in an unspecified txn context
    private boolean redeliverUnspecified = true;
    
+   private int transactionTimeout;
+   
    /**
     * @return the acknowledgeMode.
     */
@@ -652,23 +651,26 @@
       return buffer.toString();
    }
 
-   public int getReconnectAttempts()
+   public boolean getRedeliverUnspecified()
    {
-      return reconnectAttempts;
+      return redeliverUnspecified;
    }
 
-   public void setReconnectAttempts(int reconnectAttempts)
+   public void setRedeliverUnspecified(boolean redeliverUnspecified)
    {
-      this.reconnectAttempts = reconnectAttempts;
+      this.redeliverUnspecified = redeliverUnspecified;
    }
 
-   public boolean getRedeliverUnspecified()
+   public int getTransactionTimeout()
    {
-      return redeliverUnspecified;
+      return transactionTimeout;
    }
 
-   public void setRedeliverUnspecified(boolean redeliverUnspecified)
+   public void setTransactionTimeout(int transactionTimeout)
    {
-      this.redeliverUnspecified = redeliverUnspecified;
+      this.transactionTimeout = transactionTimeout;
    }
+   
+   
+
 }
\ No newline at end of file

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-02-07 19:25:22 UTC (rev 60397)
@@ -55,31 +55,30 @@
 {
    /** The log */
    private static final Logger log = Logger.getLogger(JmsServerSession.class);
-   
+
    /** The session pool */
    JmsServerSessionPool pool;
-   
+
    /** The transacted flag */
    boolean transacted;
-   
+
    /** The acknowledge mode */
    int acknowledge;
-   
+
    /** The session */
    Session session;
-   
+
    /** Any XA session */
    XASession xaSession;
-   
+
    /** The endpoint */
    MessageEndpoint endpoint;
-   
+
    /** Any DLQ handler */
    DLQHandler dlqHandler;
-      
+
    TransactionDemarcationStrategy txnStrategy;
-   
-   
+
    /**
     * Create a new JmsServerSession
     * 
@@ -88,9 +87,9 @@
    public JmsServerSession(JmsServerSessionPool pool)
    {
       this.pool = pool;
-      
+
    }
-   
+
    /**
     * Setup the session
     */
@@ -100,7 +99,7 @@
       JmsActivationSpec spec = activation.getActivationSpec();
 
       dlqHandler = activation.getDLQHandler();
-      
+
       Connection connection = activation.getConnection();
 
       // Create the session
@@ -110,25 +109,25 @@
          session = xaSession.getSession();
       }
       else
-      {         
-         transacted = spec.isSessionTransacted();         
-         acknowledge = spec.getAcknowledgeModeInt();         
+      {
+         transacted = spec.isSessionTransacted();
+         acknowledge = spec.getAcknowledgeModeInt();
          session = connection.createSession(transacted, acknowledge);
       }
-      
+
       // Get the endpoint
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
       XAResource xaResource = null;
 
       if (activation.isDeliveryTransacted() && xaSession != null)
          xaResource = xaSession.getXAResource();
-      
+
       endpoint = endpointFactory.createEndpoint(xaResource);
-      
+
       // Set the message listener
       session.setMessageListener(this);
    }
-   
+
    /**
     * Stop the session
     */
@@ -164,25 +163,25 @@
          log.debug("Error releasing session " + session, t);
       }
    }
-   
+
    public void onMessage(Message message)
    {
       try
       {
          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
-    
+
          try
          {
             if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
             {
-               MessageListener listener = (MessageListener)endpoint;
+               MessageListener listener = (MessageListener) endpoint;
                listener.onMessage(message);
             }
          }
          finally
          {
             endpoint.afterDelivery();
-            
+
             if (dlqHandler != null)
                dlqHandler.messageDelivered(message);
          }
@@ -191,13 +190,12 @@
       catch (Throwable t)
       {
          log.error("Unexpected error delivering message " + message, t);
-         
-         if(txnStrategy != null)
+
+         if (txnStrategy != null)
             txnStrategy.error();
-         
+
       }
-      
-   
+
    }
 
    public Session getSession() throws JMSException
@@ -207,7 +205,7 @@
 
    public void start() throws JMSException
    {
-      JmsActivation activation = pool.getActivation(); 
+      JmsActivation activation = pool.getActivation();
       WorkManager workManager = activation.getWorkManager();
       try
       {
@@ -222,46 +220,48 @@
 
    public void run()
    {
-      
+
       try
       {
          txnStrategy = createTransactionDemarcation();
-         
-      }catch(Throwable t)
+
+      }
+      catch (Throwable t)
       {
          log.error("Error creating transaction demarcation. Cannot continue.");
          return;
       }
-      
-      
+
       try
-      {         
+      {
          session.run();
-      }      
-      catch(Throwable t)
+      }
+      catch (Throwable t)
       {
          if (txnStrategy != null)
             txnStrategy.error();
-         
-      }finally
+
+      }
+      finally
       {
-         if(txnStrategy != null)
+         if (txnStrategy != null)
             txnStrategy.end();
 
          txnStrategy = null;
       }
-      
+
    }
-   
+
    private TransactionDemarcationStrategy createTransactionDemarcation()
    {
       return new DemarcationStrategyFactory().getStrategy();
-      
+
    }
+
    public void release()
    {
    }
-   
+
    public void workAccepted(WorkEvent e)
    {
    }
@@ -276,58 +276,60 @@
       pool.returnServerSession(this);
    }
 
-
    public void workStarted(WorkEvent e)
    {
    }
-   
+
    private class DemarcationStrategyFactory
    {
-      
+
       TransactionDemarcationStrategy getStrategy()
       {
          TransactionDemarcationStrategy current = null;
          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
          final JmsActivation activation = pool.getActivation();
-         
-         if(activation.isDeliveryTransacted() && xaSession != null)
+
+         if (activation.isDeliveryTransacted() && xaSession != null)
          {
             try
             {
                current = new XATransactionDemarcationStrategy();
-            }    
+            }
             catch (Throwable t)
             {
                log.error(this + " error creating transaction demarcation ", t);
-            }         
-          
-         }else
+            }
+
+         }
+         else
          {
-                        
-               return new LocalDemarcationStrategy();               
-            
+
+            return new LocalDemarcationStrategy();
+
          }
-         
+
          return current;
       }
-   
+
    }
+
    private interface TransactionDemarcationStrategy
    {
       void error();
+
       void end();
-      
+
    }
-   
+
    private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
    {
       public void end()
       {
          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-         
-         if(spec.isSessionTransacted())
+
+         if (spec.isSessionTransacted())
          {
-            if(session != null)
+            if (session != null)
             {
                try
                {
@@ -340,15 +342,15 @@
             }
          }
       }
-      
+
       public void error()
       {
          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-         
-         if(spec.isSessionTransacted())
+
+         if (spec.isSessionTransacted())
          {
-            if(session != null)
-               
+            if (session != null)
+
                try
                {
                   /*
@@ -360,70 +362,79 @@
                    * we rollback to force redelivery.
                    * 
                    */
-                  if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
                   {
-                     session.rollback();                     
+                     session.rollback();
                   }
-               
+
                }
                catch (JMSException e)
                {
                   log.error("Failed to rollback session transaction", e);
                }
-            
+
          }
       }
-   
+
    }
 
    private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
    {
-     
+
       boolean trace = log.isTraceEnabled();
-      
+
       Transaction trans = null;
+
       TransactionManager tm = pool.getActivation().getTransactionManager();;
-      
+
       public XATransactionDemarcationStrategy() throws Throwable
       {
-            
-            tm.begin();
 
-            try
-            {
-               trans = tm.getTransaction();
+         final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
 
-               if (trace)
-                  log.trace(JmsServerSession.this + " using tx=" + trans);
+         if (timeout > 0)
+         {
+            log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+            tm.setTransactionTimeout(timeout);
 
-               if (xaSession != null)
-               {
-                  XAResource res = xaSession.getXAResource();
+         }
 
-                  if (!trans.enlistResource(res))
-                  {
-                     throw new JMSException("could not enlist resource");
-                  }
-                  if (trace)
-                     log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
-               }
-            }
-            catch (Throwable t)
+         tm.begin();
+
+         try
+         {
+            trans = tm.getTransaction();
+
+            if (trace)
+               log.trace(JmsServerSession.this + " using tx=" + trans);
+
+            if (xaSession != null)
             {
-               try
+               XAResource res = xaSession.getXAResource();
+
+               if (!trans.enlistResource(res))
                {
-                  tm.rollback();
+                  throw new JMSException("could not enlist resource");
                }
-               catch (Throwable ignored)
-               {
-                  log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
-               }
-               throw t;
+               if (trace)
+                  log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
             }
+         }
+         catch (Throwable t)
+         {
+            try
+            {
+               tm.rollback();
+            }
+            catch (Throwable ignored)
+            {
+               log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
+            }
+            throw t;
+         }
 
-       }
-         
-      
+      }
+
       public void error()
       {
          // Mark for tollback TX via TM
@@ -440,7 +451,7 @@
          }
 
       }
-      
+
       public void end()
       {
          try
@@ -484,17 +495,17 @@
                {
                   session.commit();
                }
-            
-            }else
+
+            }
+            else
             {
                tm.suspend();
-               
+
                if (xaSession == null && pool.getActivation().isDeliveryTransacted())
                {
                   session.rollback();
                }
-               
-               
+
             }
 
          }




More information about the jboss-cvs-commits mailing list