[hornetq-commits] JBoss hornetq SVN: r9314 - in trunk: src/main/org/hornetq/ra and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 15 00:33:36 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-06-15 00:33:35 -0400 (Tue, 15 Jun 2010)
New Revision: 9314

Modified:
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
   trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java
   trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java
   trunk/src/main/org/hornetq/ra/HornetQRASession.java
   trunk/src/main/org/hornetq/ra/Util.java
   trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
Removing cached sessions on our Resource adapter

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -519,7 +519,7 @@
                                             closed);
          }
 
-         if (autoCommitAcks)
+         if (autoCommitAcks || tx == null)
          {
             ref.getQueue().acknowledge(ref);
          }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -510,11 +510,6 @@
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
-      if (this.xa && tx == null)
-      {
-         throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
-      }
-
       consumer.acknowledge(autoCommitAcks, tx, messageID);
    }
 
@@ -1169,11 +1164,6 @@
          throw e;
       }
 
-      if (this.xa && tx == null)
-      {
-         throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
-      }
-
       if (tx == null || autoCommitSends)
       {
       }

Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -26,10 +26,8 @@
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import javax.jms.QueueConnection;
 import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
-import javax.jms.TopicConnection;
 import javax.jms.XAConnection;
 import javax.jms.XAQueueConnection;
 import javax.jms.XASession;
@@ -44,7 +42,9 @@
 import javax.resource.spi.ManagedConnectionMetaData;
 import javax.resource.spi.SecurityException;
 import javax.security.auth.Subject;
+import javax.transaction.Status;
 import javax.transaction.SystemException;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
@@ -95,12 +95,10 @@
    // auto-commit session, used outside XA or Local transaction
    private Session session;
 
-   private Session transactedSession;
-
    private XASession xaSession;
 
    private XAResource xaResource;
-   
+
    private final TransactionManager tm;
 
    private boolean inManagedTx;
@@ -133,7 +131,6 @@
 
       connection = null;
       session = null;
-      transactedSession = null;
       xaSession = null;
       xaResource = null;
 
@@ -234,7 +231,7 @@
          HornetQRAManagedConnection.log.trace("destroy()");
       }
 
-      if (isDestroyed.get() ||  connection == null)
+      if (isDestroyed.get() || connection == null)
       {
          return;
       }
@@ -261,11 +258,6 @@
                session.close();
             }
 
-            if (transactedSession != null)
-            {
-               transactedSession.close();
-            }
-
             if (xaSession != null)
             {
                xaSession.close();
@@ -306,7 +298,7 @@
       destroyHandles();
 
       inManagedTx = false;
-      
+
       // I'm recreating the lock object when we return to the pool
       // because it looks too nasty to expect the connection handle
       // to unlock properly in certain race conditions
@@ -339,6 +331,35 @@
       }
    }
 
+   public void checkTransactionActive() throws JMSException
+   {
+      // don't bother looking at the transaction if there's an active XID
+      if (!inManagedTx && tm != null)
+      {
+         try
+         {
+            Transaction tx = tm.getTransaction();
+            if (tx != null)
+            {
+               int status = tx.getStatus();
+               // Only allow states that will actually succeed
+               if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING &&
+                   status != Status.STATUS_PREPARED &&
+                   status != Status.STATUS_COMMITTING)
+               {
+                  throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
+               }
+            }
+         }
+         catch (SystemException e)
+         {
+            JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
+            jmsE.initCause(e);
+            throw jmsE;
+         }
+      }
+   }
+
    /**
     * Aqquire a lock on the managed connection
     */
@@ -441,7 +462,7 @@
       //
       if (xaResource == null)
       {
-            xaResource = xaSession.getXAResource();
+         xaResource = xaSession.getXAResource();
       }
 
       if (HornetQRAManagedConnection.trace)
@@ -566,36 +587,7 @@
     */
    protected Session getSession() throws JMSException
    {
-      if (xaResource != null && isManagedTx())
-      {
-         if (HornetQRAManagedConnection.trace)
-         {
-            HornetQRAManagedConnection.log.trace("getSession() -> XA session " + xaSession.getSession());
-         }
-
-         return xaSession.getSession();
-      } 
-      else
-      {
-         if (isManagedTx())
-         {
-            if (HornetQRAManagedConnection.trace)
-            {
-               HornetQRAManagedConnection.log.trace("getSession() -> transactedSession " + transactedSession);
-            }
-
-            return transactedSession;
-         }
-         else
-         {
-            if (HornetQRAManagedConnection.trace)
-            {
-               HornetQRAManagedConnection.log.trace("getSession() -> session " + session);
-            }
-
-            return session;
-         }
-      }
+      return session;
    }
 
    /**
@@ -748,8 +740,8 @@
       try
       {
          boolean transacted = cri.isTransacted();
-         int acknowledgeMode =  Session.AUTO_ACKNOWLEDGE;
-         
+         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
          if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
          {
             if (userName != null && password != null)
@@ -764,8 +756,7 @@
             connection.setExceptionListener(this);
 
             xaSession = ((XATopicConnection)connection).createXATopicSession();
-            transactedSession = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
-            session = ((TopicConnection)connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = xaSession.getSession();
          }
          else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
          {
@@ -781,8 +772,7 @@
             connection.setExceptionListener(this);
 
             xaSession = ((XAQueueConnection)connection).createXAQueueSession();
-            transactedSession = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
-            session = ((QueueConnection)connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = xaSession.getSession();
          }
          else
          {
@@ -798,8 +788,7 @@
             connection.setExceptionListener(this);
 
             xaSession = ((XAConnection)connection).createXASession();
-            transactedSession = connection.createSession(transacted, acknowledgeMode);
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = xaSession.getSession();
          }
       }
       catch (JMSException je)
@@ -807,7 +796,7 @@
          throw new ResourceException(je.getMessage(), je);
       }
    }
-   
+
    private boolean isManagedTx()
    {
       return inManagedTx || isXA();

Modified: trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -96,6 +96,7 @@
       {
          HornetQRAMessageConsumer.log.trace("checkState()");
       }
+      session.checkState();
    }
 
    /**

Modified: trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -402,6 +402,7 @@
     */
    void checkState() throws JMSException
    {
+      session.checkState();
    }
 
    /**

Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -49,6 +49,8 @@
 import javax.jms.XATopicSession;
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionEvent;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
 import javax.transaction.xa.XAResource;
 
 import org.hornetq.core.logging.Logger;
@@ -1612,4 +1614,14 @@
       }
       return (TopicSession)s;
    }
+
+   /**
+    * @throws SystemException 
+    * @throws RollbackException 
+    * 
+    */
+   public void checkState() throws JMSException
+   {
+      mc.checkTransactionActive();
+   }
 }

Modified: trunk/src/main/org/hornetq/ra/Util.java
===================================================================
--- trunk/src/main/org/hornetq/ra/Util.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/Util.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -197,7 +197,13 @@
    
 
    /** The Resource adapter can't depend on any provider's specific library. Because of that we use reflection to locate the
-    *  transaction manager during startup. */
+    *  transaction manager during startup. 
+    *  
+    *  
+    *  TODO: https://jira.jboss.org/browse/HORNETQ-417 
+    *        We should use a proper SPI instead of reflection
+    *        We would need to define a proper SPI package for this.
+    *  */
    public static TransactionManager locateTM(final String locatorClass, final String locatorMethod)
    {
       try

Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java	2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java	2010-06-15 04:33:35 UTC (rev 9314)
@@ -138,7 +138,7 @@
          
          ClientConsumer cons = session.createConsumer("Test");
          
-         assertNull("Send went through an invalid XA Session", cons.receiveImmediate());
+         assertNotNull("Send went through an invalid XA Session", cons.receiveImmediate());
       }
       finally
       {
@@ -194,9 +194,7 @@
          
          msg = cons.receiveImmediate();
          
-         assertNotNull("Acknowledge went through invalid XA Session", msg);
-         
-         assertNull(cons.receiveImmediate());
+         assertNull("Acknowledge went through invalid XA Session", msg);
 
          
          



More information about the hornetq-commits mailing list