[jboss-cvs] JBossAS SVN: r59868 - branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 20 01:05:21 EST 2007


Author: weston.price at jboss.com
Date: 2007-01-20 01:05:18 -0500 (Sat, 20 Jan 2007)
New Revision: 59868

Modified:
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
Log:
[JBAS-4001] Backport of JCA/JMS adapter to address certain issues in 
old code of using 1PC resource as well as handling redelivery for
BMT/CMT NotSupported listeners.

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-01-20 06:05:18 UTC (rev 59868)
@@ -162,13 +162,13 @@
    
    public TransactionManager getTransactionManager()
    {
-       if(tm == null)
-       {
-          tm = TransactionManagerLocator.getInstance().locate();
-          
-       }
-       
-       return tm;
+      if (tm == null)
+      {
+         tm = TransactionManagerLocator.getInstance().locate();
+
+      }
+
+      return tm;
    }
 
    /**
@@ -229,9 +229,9 @@
    public void handleFailure(Throwable failure)
    {
       log.warn("Failure in jms activation " + spec, failure);
-      boolean reconnected = false;
+      int reconnectCount = 0;
       
-      while (deliveryActive.get())
+      while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
       {
          teardown();
          try
@@ -248,18 +248,16 @@
          try
          {
             setup();
-            reconnected = true;
+            log.info("Reconnected with messaging provider.");            
+            break;
          }
          catch (Throwable t)
          {
             log.error("Unable to reconnect " + spec, t);
          }
          
-         if(reconnected)
-         {
-            log.info("Reconnected to JMS provider " + spec);
-            break;
-         }
+         ++reconnectCount;
+
       }
    }
 

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2007-01-20 06:05:18 UTC (rev 59868)
@@ -114,7 +114,13 @@
 
    /** The DLQ max resent  */
    private int dLQMaxResent;
-
+   
+   //Default to 5 attempts
+   private int reconnectAttempts = 5;
+   
+   //Used to specify whether or not we should attempt to redeliver a message in an unspecified txn context
+   private boolean redeliverUnspecified = true;
+   
    /**
     * @return the acknowledgeMode.
     */
@@ -645,4 +651,24 @@
       buffer.append(')');
       return buffer.toString();
    }
+
+   public int getReconnectAttempts()
+   {
+      return reconnectAttempts;
+   }
+
+   public void setReconnectAttempts(int reconnectAttempts)
+   {
+      this.reconnectAttempts = reconnectAttempts;
+   }
+
+   public boolean getRedeliverUnspecified()
+   {
+      return redeliverUnspecified;
+   }
+
+   public void setRedeliverUnspecified(boolean redeliverUnspecified)
+   {
+      this.redeliverUnspecified = redeliverUnspecified;
+   }
 }
\ No newline at end of file

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2007-01-20 06:05:18 UTC (rev 59868)
@@ -76,9 +76,9 @@
    
    /** Any DLQ handler */
    DLQHandler dlqHandler;
+      
+   TransactionDemarcationStrategy txnStrategy;
    
-   /** The runtimeHandler */
-   RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
    
    /**
     * Create a new JmsServerSession
@@ -104,23 +104,25 @@
       Connection connection = activation.getConnection();
 
       // Create the session
-      if (connection instanceof XAConnection)
+      if (connection instanceof XAConnection && activation.isDeliveryTransacted())
       {
          xaSession = ((XAConnection) connection).createXASession();
          session = xaSession.getSession();
       }
       else
-      {
-         transacted = spec.isSessionTransacted();
-         acknowledge = spec.getAcknowledgeModeInt();
+      {         
+         transacted = spec.isSessionTransacted();         
+         acknowledge = spec.getAcknowledgeModeInt();         
          session = connection.createSession(transacted, acknowledge);
       }
       
       // Get the endpoint
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
       XAResource xaResource = null;
+
       if (activation.isDeliveryTransacted() && xaSession != null)
          xaResource = xaSession.getXAResource();
+      
       endpoint = endpointFactory.createEndpoint(xaResource);
       
       // Set the message listener
@@ -165,15 +167,6 @@
    
    public void onMessage(Message message)
    {
-      TransactionDemarcationStrategy td = null;
-      
-      if (JmsServerSessionPool.USE_OLD)
-      {
-         td = createTransactionDemarcation();
-         if (td == null)
-            return;
-      }
-      
       try
       {
          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
@@ -181,11 +174,15 @@
          try
          {
             if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
-               ((MessageListener) endpoint).onMessage(message);
+            {
+               MessageListener listener = (MessageListener)endpoint;
+               listener.onMessage(message);
+            }
          }
          finally
          {
             endpoint.afterDelivery();
+            
             if (dlqHandler != null)
                dlqHandler.messageDelivered(message);
          }
@@ -195,17 +192,11 @@
       {
          log.error("Unexpected error delivering message " + message, t);
          
-         if (td != null)
-               td.error();
+         if(txnStrategy != null)
+            txnStrategy.error();
          
-         runtimeHandler.handleRuntimeError(t);
-         
       }
-      finally
-      {
-         if (td != null)
-            td.end();
-      }
+      
    
    }
 
@@ -231,34 +222,33 @@
 
    public void run()
    {
-      TransactionDemarcationStrategy td = null;
       
-      if (JmsServerSessionPool.USE_OLD == false)
+      try
       {
-         td = createTransactionDemarcation();
+         txnStrategy = createTransactionDemarcation();
          
-         if (td == null)
-            return;
+      }catch(Throwable t)
+      {
+         log.error("Error creating transaction demarcation. Cannot continue.");
+         return;
       }
       
+      
       try
       {         
-         if (JmsServerSessionPool.USE_OLD && xaSession != null)
-            xaSession.run();
-         
-         else
-            session.run();
-
-      }
+         session.run();
+      }      
       catch(Throwable t)
       {
-         if (td != null)
-            td.error();
+         if (txnStrategy != null)
+            txnStrategy.error();
          
       }finally
       {
-         if (td != null)
-            td.end();
+         if(txnStrategy != null)
+            txnStrategy.end();
+
+         txnStrategy = null;
       }
       
    }
@@ -294,31 +284,31 @@
    private class DemarcationStrategyFactory
    {
       
-      public DemarcationStrategyFactory()
-      {
-      }
-
       TransactionDemarcationStrategy getStrategy()
       {
+         TransactionDemarcationStrategy current = null;
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JmsActivation activation = pool.getActivation();
          
-         if(pool.getActivation().isDeliveryTransacted())
+         if(activation.isDeliveryTransacted() && xaSession != null)
          {
             try
             {
-               return new XATransactionDemarcationStrategy();
-            }
+               current = new XATransactionDemarcationStrategy();
+            }    
             catch (Throwable t)
             {
                log.error(this + " error creating transaction demarcation ", t);
-               return null;
             }         
           
          }else
          {
-            return new LocalDemarcationStrategy();
+                        
+               return new LocalDemarcationStrategy();               
             
          }
          
+         return current;
       }
    
    }
@@ -333,7 +323,9 @@
    {
       public void end()
       {
-         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         
+         if(spec.isSessionTransacted())
          {
             if(session != null)
             {
@@ -351,13 +343,28 @@
       
       public void error()
       {
-         if(pool.getActivation().getActivationSpec().isSessionTransacted())
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         
+         if(spec.isSessionTransacted())
          {
             if(session != null)
                
                try
                {
-                  session.rollback();
+                  /*
+                   * Looks strange, but this basically means
+                   * 
+                   * If the underlying connection was non-XA and the transaction attribute is REQUIRED
+                   * we rollback. Also, if the underlying connection was non-XA and the transaction
+                   * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled 
+                   * we rollback to force redelivery.
+                   * 
+                   */
+                  if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  {
+                     session.rollback();                     
+                  }
+               
                }
                catch (JMSException e)
                {
@@ -371,9 +378,9 @@
 
    private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
    {
-	 
-	  boolean trace = log.isTraceEnabled();
-	  
+     
+      boolean trace = log.isTraceEnabled();
+      
       Transaction trans = null;
       TransactionManager tm = pool.getActivation().getTransactionManager();;
       
@@ -391,8 +398,8 @@
 
                if (xaSession != null)
                {
-                  XAResource res = JcaXAResourceWrapperFactory.getResourceWrapper(xaSession.getXAResource());
-                  
+                  XAResource res = xaSession.getXAResource();
+
                   if (!trans.enlistResource(res))
                   {
                      throw new JMSException("could not enlist resource");
@@ -477,6 +484,17 @@
                {
                   session.commit();
                }
+            
+            }else
+            {
+               tm.suspend();
+               
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+               
+               
             }
 
          }

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java	2007-01-20 06:05:18 UTC (rev 59868)
@@ -45,20 +45,7 @@
 {
    /** The logger */
    private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
-   
-   public static final boolean USE_OLD;
-   
-   static
-   {
-      USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
-      {
-         public Object run()
-         {
-            return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
-         }
-      })).booleanValue();
-   }
-   
+      
    /** The activation */
    JmsActivation activation;
 
@@ -73,7 +60,8 @@
    
    /** The number of sessions */
    int sessionCount = 0;
-
+   
+   
    /**
     * Create a new session pool
     * 
@@ -119,6 +107,7 @@
          log.trace("getServerSession");
 
       ServerSession result = null;
+      
       try
       {
          synchronized (serverSessions)
@@ -126,13 +115,16 @@
             while (true)
             {
                int sessionsSize = serverSessions.size();
+               
                if (stopped)
                   throw new Exception("Cannot get a server session after the pool is stopped");
+               
                else if (sessionsSize > 0)
                {
                   result = (ServerSession) serverSessions.remove(sessionsSize-1);
                   break;
                }
+               
                else
                {
                   try




More information about the jboss-cvs-commits mailing list