[jboss-cvs] JBossAS SVN: r58487 - trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 16 23:04:54 EST 2006


Author: weston.price at jboss.com
Date: 2006-11-16 23:04:52 -0500 (Thu, 16 Nov 2006)
New Revision: 58487

Modified:
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
Log:
Changes for JBoss 5.0 beta. No JIRA issue as it was not logged. New transaction
handling of unspecified transaction context delivery for BMT as well as CMT MDB's
where the transaction attribute is NOT_SUPPORTED.

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2006-11-17 04:03:21 UTC (rev 58486)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2006-11-17 04:04:52 UTC (rev 58487)
@@ -162,13 +162,13 @@
    
    public TransactionManager getTransactionManager()
    {
-       if(tm == null)
-       {
-          tm = TransactionManagerLocator.getInstance().locate();
-          
-       }
-       
-       return tm;
+      if (tm == null)
+      {
+         tm = TransactionManagerLocator.getInstance().locate();
+
+      }
+
+      return tm;
    }
 
    /**
@@ -229,9 +229,9 @@
    public void handleFailure(Throwable failure)
    {
       log.warn("Failure in jms activation " + spec, failure);
-      boolean reconnected = false;
+      int reconnectCount = 0;
       
-      while (deliveryActive.get())
+      while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
       {
          teardown();
          try
@@ -248,18 +248,16 @@
          try
          {
             setup();
-            reconnected = true;
+            log.info("Reconnected with messaging provider.");            
+            break;
          }
          catch (Throwable t)
          {
             log.error("Unable to reconnect " + spec, t);
          }
          
-         if(reconnected)
-         {
-            log.info("Reconnected to JMS provider " + spec);
-            break;
-         }
+         ++reconnectCount;
+
       }
    }
 

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2006-11-17 04:03:21 UTC (rev 58486)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2006-11-17 04:04:52 UTC (rev 58487)
@@ -114,7 +114,12 @@
 
    /** The DLQ max resent  */
    private int dLQMaxResent;
-
+   
+   //Default to 5 attempts
+   private int reconnectAttempts = 5;
+   
+   private boolean redeliverUnspecified = true;
+   
    /**
     * @return the acknowledgeMode.
     */
@@ -645,4 +650,24 @@
       buffer.append(')');
       return buffer.toString();
    }
+
+   public int getReconnectAttempts()
+   {
+      return reconnectAttempts;
+   }
+
+   public void setReconnectAttempts(int reconnectAttempts)
+   {
+      this.reconnectAttempts = reconnectAttempts;
+   }
+
+   public boolean getRedeliverUnspecified()
+   {
+      return redeliverUnspecified;
+   }
+
+   public void setRedeliverUnspecified(boolean redeliverUnspecified)
+   {
+      this.redeliverUnspecified = redeliverUnspecified;
+   }
 }
\ No newline at end of file

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2006-11-17 04:03:21 UTC (rev 58486)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2006-11-17 04:04:52 UTC (rev 58487)
@@ -79,6 +79,9 @@
    /** The runtimeHandler */
    RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
    
+   TransactionDemarcationStrategy txnStrategy;
+   
+   
    /**
     * Create a new JmsServerSession
     * 
@@ -103,23 +106,25 @@
       Connection connection = activation.getConnection();
 
       // Create the session
-      if (connection instanceof XAConnection)
+      if (connection instanceof XAConnection && activation.isDeliveryTransacted())
       {
          xaSession = ((XAConnection) connection).createXASession();
          session = xaSession.getSession();
       }
       else
-      {
-         transacted = spec.isSessionTransacted();
-         acknowledge = spec.getAcknowledgeModeInt();
+      {         
+         transacted = spec.isSessionTransacted();         
+         acknowledge = spec.getAcknowledgeModeInt();         
          session = connection.createSession(transacted, acknowledge);
       }
       
       // Get the endpoint
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
       XAResource xaResource = null;
+
       if (activation.isDeliveryTransacted() && xaSession != null)
          xaResource = xaSession.getXAResource();
+      
       endpoint = endpointFactory.createEndpoint(xaResource);
       
       // Set the message listener
@@ -164,15 +169,6 @@
    
    public void onMessage(Message message)
    {
-      TransactionDemarcationStrategy td = null;
-      
-      if (JmsServerSessionPool.USE_OLD)
-      {
-         td = createTransactionDemarcation();
-         if (td == null)
-            return;
-      }
-      
       try
       {
          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
@@ -180,11 +176,15 @@
          try
          {
             if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
-               ((MessageListener) endpoint).onMessage(message);
+            {
+               MessageListener listener = (MessageListener)endpoint;
+               listener.onMessage(message);
+            }
          }
          finally
          {
             endpoint.afterDelivery();
+            
             if (dlqHandler != null)
                dlqHandler.messageDelivered(message);
          }
@@ -194,17 +194,11 @@
       {
          log.error("Unexpected error delivering message " + message, t);
          
-         if (td != null)
-               td.error();
+         if(txnStrategy != null)
+            txnStrategy.error();
          
-         runtimeHandler.handleRuntimeError(t);
-         
       }
-      finally
-      {
-         if (td != null)
-            td.end();
-      }
+      
    
    }
 
@@ -230,34 +224,33 @@
 
    public void run()
    {
-      TransactionDemarcationStrategy td = null;
       
-      if (JmsServerSessionPool.USE_OLD == false)
+      try
       {
-         td = createTransactionDemarcation();
+         txnStrategy = createTransactionDemarcation();
          
-         if (td == null)
-            return;
+      }catch(Throwable t)
+      {
+         log.error("Error creating transaction demarcation. Cannot continue.");
+         return;
       }
       
+      
       try
       {         
-         if (JmsServerSessionPool.USE_OLD && xaSession != null)
-            xaSession.run();
-         
-         else
-            session.run();
-
-      }
+         session.run();
+      }      
       catch(Throwable t)
       {
-         if (td != null)
-            td.error();
+         if (txnStrategy != null)
+            txnStrategy.error();
          
       }finally
       {
-         if (td != null)
-            td.end();
+         if(txnStrategy != null)
+            txnStrategy.end();
+
+         txnStrategy = null;
       }
       
    }
@@ -293,31 +286,31 @@
    private class DemarcationStrategyFactory
    {
       
-      public DemarcationStrategyFactory()
-      {
-      }
-
       TransactionDemarcationStrategy getStrategy()
       {
+         TransactionDemarcationStrategy current = null;
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JmsActivation activation = pool.getActivation();
          
-         if(pool.getActivation().isDeliveryTransacted())
+         if(activation.isDeliveryTransacted() && xaSession != null)
          {
             try
             {
-               return new XATransactionDemarcationStrategy();
-            }
+               current = new XATransactionDemarcationStrategy();
+            }    
             catch (Throwable t)
             {
                log.error(this + " error creating transaction demarcation ", t);
-               return null;
             }         
           
          }else
          {
-            return new LocalDemarcationStrategy();
+                        
+               return new LocalDemarcationStrategy();               
             
          }
          
+         return current;
       }
    
    }
@@ -332,7 +325,9 @@
    {
       public void end()
       {
-         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         
+         if(spec.isSessionTransacted())
          {
             if(session != null)
             {
@@ -350,13 +345,28 @@
       
       public void error()
       {
-         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         
+         if(spec.isSessionTransacted())
          {
             if(session != null)
                
                try
                {
-                  session.rollback();
+                  /*
+                   * Looks strange, but this basically means
+                   * 
+                   * If the underlying connection was non-XA and the transaction attribute is REQUIRED
+                   * we rollback. Also, if the underlying connection was non-XA and the transaction
+                   * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled 
+                   * we rollback to force redelivery.
+                   * 
+                   */
+                  if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  {
+                     session.rollback();                     
+                  }
+               
                }
                catch (JMSException e)
                {




More information about the jboss-cvs-commits mailing list