[hornetq-commits] JBoss hornetq SVN: r8937 - in trunk/src: main/org/hornetq/ra and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 19 09:38:38 EDT 2010


Author: jmesnil
Date: 2010-03-19 09:38:37 -0400 (Fri, 19 Mar 2010)
New Revision: 8937

Modified:
   trunk/src/config/ra.xml
   trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
   trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java
   trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
   trunk/src/main/org/hornetq/ra/HornetQRASession.java
   trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java
Log:
fix resource adapter

Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/config/ra.xml	2010-03-19 13:38:37 UTC (rev 8937)
@@ -45,15 +45,9 @@
          <config-property-type>java.lang.String</config-property-type>
          <config-property-value>server-id=0</config-property-value>
       </config-property>
-      <config-property>
-        <description>Use XA?</description>
-        <config-property-name>UseXA</config-property-name>
-        <config-property-type>java.lang.Boolean</config-property-type>
-        <config-property-value>true</config-property-value>
-      </config-property>
       <!--
       <config-property>
-        <description>The method to use for locatingthe transactionmanager</description>
+        <description>The method to use for locating the transactionmanager</description>
         <config-property-name>TransactionManagerLocatorMethod</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>getTm</config-property-value>

Modified: trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java	2010-03-19 13:38:37 UTC (rev 8937)
@@ -43,9 +43,6 @@
    /** The client id */
    private String clientID;
 
-   /** Use XA */
-   private boolean useXA;
-
    /** The type */
    private final int type;
 
@@ -70,7 +67,6 @@
       userName = prop.getUserName();
       password = prop.getPassword();
       clientID = prop.getClientID();
-      useXA = prop.isUseXA();
       this.type = type;
       transacted = true;
       acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
@@ -138,7 +134,6 @@
       {
          clientID = prop.getClientID();
       }
-      useXA = prop.isUseXA();
    }
 
    /**
@@ -240,20 +235,6 @@
    }
 
    /**
-    * Use XA communication
-    * @return True if XA; otherwise false
-    */
-   public boolean isUseXA()
-   {
-      if (HornetQRAConnectionRequestInfo.trace)
-      {
-         HornetQRAConnectionRequestInfo.log.trace("isUseXA() " + useXA);
-      }
-
-      return useXA;
-   }
-
-   /**
     * Use transactions
     * @return True if transacted; otherwise false
     */
@@ -305,7 +286,6 @@
          return Util.compare(userName, you.getUserName()) && Util.compare(password, you.getPassword()) &&
                 Util.compare(clientID, you.getClientID()) &&
                 type == you.getType() &&
-                useXA == you.isUseXA() &&
                 transacted == you.isTransacted() &&
                 acknowledgeMode == you.getAcknowledgeMode();
       }
@@ -332,7 +312,6 @@
       hash += 31 * hash + (userName != null ? userName.hashCode() : 0);
       hash += 31 * hash + (password != null ? password.hashCode() : 0);
       hash += 31 * hash + Integer.valueOf(type).hashCode();
-      hash += 31 * hash + (useXA ? 1 : 0);
       hash += 31 * hash + (transacted ? 1 : 0);
       hash += 31 * hash + Integer.valueOf(acknowledgeMode).hashCode();
 
@@ -343,7 +322,7 @@
    public String toString()
    {
       return "HornetQRAConnectionRequestInfo[type=" + type +
-         ", useXA=" + useXA + ", transacted=" + transacted + ", acknowledgeMode=" + acknowledgeMode +
+         ", transacted=" + transacted + ", acknowledgeMode=" + acknowledgeMode +
          ", clientID=" + clientID + ", userName=" + userName + ", password=" + password + "]";
    }
 }

Modified: trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java	2010-03-19 13:38:37 UTC (rev 8937)
@@ -57,6 +57,12 @@
     */
    public void begin() throws ResourceException
    {
+      if (HornetQRALocalTransaction.trace)
+      {
+         HornetQRALocalTransaction.log.trace("begin()");
+      }
+      
+      mc.setInManagedTx(true);
    }
 
    /**
@@ -65,6 +71,11 @@
     */
    public void commit() throws ResourceException
    {
+      if (HornetQRALocalTransaction.trace)
+      {
+         HornetQRALocalTransaction.log.trace("commit()");
+      }
+      
       mc.lock();
       try
       {
@@ -79,6 +90,7 @@
       }
       finally
       {
+         mc.setInManagedTx(false);
          mc.unlock();
       }
    }
@@ -89,6 +101,11 @@
     */
    public void rollback() throws ResourceException
    {
+      if (HornetQRALocalTransaction.trace)
+      {
+         HornetQRALocalTransaction.log.trace("rollback()");
+      }
+      
       mc.lock();
       try
       {
@@ -103,6 +120,7 @@
       }
       finally
       {
+         mc.setInManagedTx(false);
          mc.unlock();
       }
    }

Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java	2010-03-19 13:38:37 UTC (rev 8937)
@@ -27,18 +27,13 @@
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
 import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
 import javax.jms.XAConnection;
 import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueSession;
 import javax.jms.XASession;
 import javax.jms.XATopicConnection;
-import javax.jms.XATopicSession;
-import javax.resource.NotSupportedException;
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionEvent;
 import javax.resource.spi.ConnectionEventListener;
@@ -95,22 +90,14 @@
    // Physical JMS connection stuff
    private Connection connection;
 
-   private XAConnection xaConnection;
-
    private Session session;
 
-   private TopicSession topicSession;
-
-   private QueueSession queueSession;
-
    private XASession xaSession;
 
-   private XATopicSession xaTopicSession;
-
-   private XAQueueSession xaQueueSession;
-
    private XAResource xaResource;
 
+   private boolean inManagedTx;
+
    /**
     * Constructor
     * @param mcf The managed connection factory
@@ -136,13 +123,8 @@
       handles = Collections.synchronizedSet(new HashSet<HornetQRASession>());
 
       connection = null;
-      xaConnection = null;
       session = null;
-      topicSession = null;
-      queueSession = null;
       xaSession = null;
-      xaTopicSession = null;
-      xaQueueSession = null;
       xaResource = null;
 
       try
@@ -213,11 +195,6 @@
 
       try
       {
-         if (xaConnection != null)
-         {
-            xaConnection.stop();
-         }
-
          if (connection != null)
          {
             connection.stop();
@@ -247,7 +224,7 @@
          HornetQRAManagedConnection.log.trace("destroy()");
       }
 
-      if (isDestroyed.get() || xaConnection == null && connection == null)
+      if (isDestroyed.get() ||  connection == null)
       {
          return;
       }
@@ -256,14 +233,7 @@
 
       try
       {
-         if (xaConnection != null)
-         {
-            xaConnection.setExceptionListener(null);
-         }
-         else
-         {
-            connection.setExceptionListener(null);
-         }
+         connection.setExceptionListener(null);
       }
       catch (JMSException e)
       {
@@ -276,26 +246,6 @@
       {
          try
          {
-            if (topicSession != null)
-            {
-               topicSession.close();
-            }
-
-            if (xaTopicSession != null)
-            {
-               xaTopicSession.close();
-            }
-
-            if (queueSession != null)
-            {
-               queueSession.close();
-            }
-
-            if (xaQueueSession != null)
-            {
-               xaQueueSession.close();
-            }
-
             if (session != null)
             {
                session.close();
@@ -315,11 +265,6 @@
          {
             connection.close();
          }
-
-         if (xaConnection != null)
-         {
-            xaConnection.close();
-         }
       }
       catch (Throwable e)
       {
@@ -345,6 +290,8 @@
 
       destroyHandles();
 
+      inManagedTx = false;
+      
       // I'm recreating the lock object when we return to the pool
       // because it looks too nasty to expect the connection handle
       // to unlock properly in certain race conditions
@@ -473,29 +420,13 @@
          HornetQRAManagedConnection.log.trace("getXAResource()");
       }
 
-      if (xaConnection == null)
-      {
-         throw new NotSupportedException("Non XA transaction not supported");
-      }
-
       //
       // Spec says a mc must allways return the same XA resource,
       // so we cache it.
       //
       if (xaResource == null)
       {
-         if (xaTopicSession != null)
-         {
-            xaResource = xaTopicSession.getXAResource();
-         }
-         else if (xaQueueSession != null)
-         {
-            xaResource = xaQueueSession.getXAResource();
-         }
-         else
-         {
             xaResource = xaSession.getXAResource();
-         }
       }
 
       if (HornetQRAManagedConnection.trace)
@@ -602,14 +533,7 @@
 
       try
       {
-         if (xaConnection != null)
-         {
-            xaConnection.setExceptionListener(null);
-         }
-         else
-         {
-            connection.setExceptionListener(null);
-         }
+         connection.setExceptionListener(null);
       }
       catch (JMSException e)
       {
@@ -621,72 +545,28 @@
    }
 
    /**
-    * Is managed connection running in XA mode
-    * @return True if XA; otherwise false
+    * Get the session for this connection.
+    * @return The session
+    * @throws JMSException 
     */
-   protected boolean isXA()
+   protected Session getSession() throws JMSException
    {
-      if (HornetQRAManagedConnection.trace)
+      if (xaResource != null && inManagedTx)
       {
-         HornetQRAManagedConnection.log.trace("isXA()");
-      }
-
-      return xaConnection != null;
-   }
-
-   /**
-    * Get the XA session for this connection.
-    * @return The XA session
-    */
-   protected XASession getXASession()
-   {
-      if (HornetQRAManagedConnection.trace)
-      {
-         HornetQRAManagedConnection.log.trace("getXASession()");
-      }
-
-      if (isXA())
-      {
-         if (xaTopicSession != null)
+         if (HornetQRAManagedConnection.trace)
          {
-            return xaTopicSession;
+            HornetQRAManagedConnection.log.trace("getSession() -> XA session " + xaSession.getSession());
          }
-         else if (xaQueueSession != null)
-         {
-            return xaQueueSession;
-         }
-         else
-         {
-            return xaSession;
-         }
-      }
-      else
-      {
-         return null;
-      }
-   }
 
-   /**
-    * Get the session for this connection.
-    * @return The session
-    */
-   protected Session getSession()
-   {
-      if (HornetQRAManagedConnection.trace)
-      {
-         HornetQRAManagedConnection.log.trace("getSession()");
-      }
-
-      if (topicSession != null)
-      {
-         return topicSession;
-      }
-      else if (queueSession != null)
-      {
-         return queueSession;
-      }
+         return xaSession.getSession();
+      } 
       else
       {
+         if (HornetQRAManagedConnection.trace)
+         {
+            HornetQRAManagedConnection.log.trace("getSession() -> session " + xaSession.getSession());
+         }
+
          return session;
       }
    }
@@ -794,11 +674,6 @@
       {
          connection.start();
       }
-
-      if (xaConnection != null)
-      {
-         xaConnection.start();
-      }
    }
 
    /**
@@ -812,11 +687,6 @@
          HornetQRAManagedConnection.log.trace("stop()");
       }
 
-      if (xaConnection != null)
-      {
-         xaConnection.stop();
-      }
-
       if (connection != null)
       {
          connection.stop();
@@ -851,109 +721,55 @@
       try
       {
          boolean transacted = cri.isTransacted();
-         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
+         int acknowledgeMode =  Session.AUTO_ACKNOWLEDGE;
+         
          if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
          {
-            if (cri.isUseXA())
+            if (userName != null && password != null)
             {
-               if (userName != null && password != null)
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
-               }
-               else
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXATopicConnection();
-               }
-
-               xaConnection.setExceptionListener(this);
-
-               xaTopicSession = ((XATopicConnection)xaConnection).createXATopicSession();
-               topicSession = xaTopicSession.getTopicSession();
+               connection = mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
             }
             else
             {
-               if (userName != null && password != null)
-               {
-                  connection = mcf.getHornetQConnectionFactory().createTopicConnection(userName, password);
-               }
-               else
-               {
-                  connection = mcf.getHornetQConnectionFactory().createTopicConnection();
-               }
+               connection = mcf.getHornetQConnectionFactory().createXATopicConnection();
+            }
 
-               connection.setExceptionListener(this);
+            connection.setExceptionListener(this);
 
-               topicSession = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
-            }
+            xaSession = ((XATopicConnection)connection).createXATopicSession();
+            session = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
          }
          else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
          {
-            if (cri.isUseXA())
+            if (userName != null && password != null)
             {
-               if (userName != null && password != null)
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
-               }
-               else
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXAQueueConnection();
-               }
-
-               xaConnection.setExceptionListener(this);
-
-               xaQueueSession = ((XAQueueConnection)xaConnection).createXAQueueSession();
-               queueSession = xaQueueSession.getQueueSession();
+               connection = mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
             }
             else
             {
-               if (userName != null && password != null)
-               {
-                  connection = mcf.getHornetQConnectionFactory().createQueueConnection(userName, password);
-               }
-               else
-               {
-                  connection = mcf.getHornetQConnectionFactory().createQueueConnection();
-               }
+               connection = mcf.getHornetQConnectionFactory().createXAQueueConnection();
+            }
 
-               connection.setExceptionListener(this);
+            connection.setExceptionListener(this);
 
-               queueSession = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
-            }
+            xaSession = ((XAQueueConnection)connection).createXAQueueSession();
+            session = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
          }
          else
          {
-            if (cri.isUseXA())
+            if (userName != null && password != null)
             {
-               if (userName != null && password != null)
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
-               }
-               else
-               {
-                  xaConnection = mcf.getHornetQConnectionFactory().createXAConnection();
-               }
-
-               xaConnection.setExceptionListener(this);
-
-               xaSession = xaConnection.createXASession();
-               session = xaSession.getSession();
+               connection = mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
             }
             else
             {
-               if (userName != null && password != null)
-               {
-                  connection = mcf.getHornetQConnectionFactory().createConnection(userName, password);
-               }
-               else
-               {
-                  connection = mcf.getHornetQConnectionFactory().createConnection();
-               }
+               connection = mcf.getHornetQConnectionFactory().createXAConnection();
+            }
 
-               connection.setExceptionListener(this);
+            connection.setExceptionListener(this);
 
-               session = connection.createSession(transacted, acknowledgeMode);
-            }
+            xaSession = ((XAConnection)connection).createXASession();
+            session = connection.createSession(transacted, acknowledgeMode);
          }
       }
       catch (JMSException je)
@@ -961,4 +777,9 @@
          throw new ResourceException(je.getMessage(), je);
       }
    }
+
+   protected void setInManagedTx(boolean inManagedTx)
+   {
+      this.inManagedTx = inManagedTx;
+   }
 }

Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java	2010-03-19 13:38:37 UTC (rev 8937)
@@ -47,6 +47,7 @@
 import javax.jms.XAQueueSession;
 import javax.jms.XASession;
 import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionEvent;
 import javax.transaction.xa.XAResource;
 
@@ -1214,9 +1215,7 @@
       {
          lock();
 
-         XASession session = getXASessionInternal();
-
-         return session.getXAResource();
+         return getXAResourceInternal();
       }
       catch (Throwable t)
       {
@@ -1551,26 +1550,35 @@
    }
 
    /**
-    * Get the XA session and ensure that it is open
-    * @return The session
+    * Get the XA resource and ensure that it is open
+    * @return The XA Resource
     * @exception JMSException Thrown if an error occurs
     * @exception IllegalStateException The session is closed
     */
-   XASession getXASessionInternal() throws JMSException
+   XAResource getXAResourceInternal() throws JMSException
    {
       if (mc == null)
       {
          throw new IllegalStateException("The session is closed");
       }
 
-      XASession session = mc.getXASession();
+      try
+      {
+         XAResource xares = mc.getXAResource();
 
-      if (HornetQRASession.trace)
+         if (HornetQRASession.trace)
+         {
+            HornetQRASession.log.trace("getXAResourceInternal " + xares + " for " + this);
+         }
+
+         return xares;
+      }
+      catch (ResourceException e)
       {
-         HornetQRASession.log.trace("getXASessionInternal " + session + " for " + this);
+         JMSException jmse = new JMSException("Unable to get XA Resource");
+         jmse.initCause(e);
+         throw jmse;
       }
-
-      return session;
    }
 
    /**

Modified: trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java	2010-03-18 18:38:32 UTC (rev 8936)
+++ trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java	2010-03-19 13:38:37 UTC (rev 8937)
@@ -76,6 +76,7 @@
       }
       finally
       {
+         managedConnection.setInManagedTx(true);
          managedConnection.unlock();
       }
    }
@@ -100,6 +101,7 @@
       }
       finally
       {
+         managedConnection.setInManagedTx(false);
          managedConnection.unlock();
       }
    }



More information about the hornetq-commits mailing list