[jboss-cvs] JBossAS SVN: r68459 - in branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter: jdbc/local and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 20 11:54:28 EST 2007


Author: adrian at jboss.org
Date: 2007-12-20 11:54:27 -0500 (Thu, 20 Dec 2007)
New Revision: 68459

Added:
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
Modified:
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
   branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
Log:
[JBAS-5084] - Add a lock for the session to avoid racing between jms activity and asynchronous rollback

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -34,6 +34,8 @@
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionEvent;
@@ -66,6 +68,8 @@
    private final int transactionIsolation;
    private final boolean readOnly;
 
+   private ReentrantLock lock = new ReentrantLock();
+   
    private final Collection cels = new ArrayList();
    private final Set handles = new HashSet();
    private PreparedStatementCache psCache = null;
@@ -194,6 +198,11 @@
                mcf.log.warn("Error resetting transaction isolation ", e);
             }
          }
+         // I'm recreating the lock object when we return to the pool
+         // because it looks too nasty to expect the connection handle
+         // to unlock properly in certain race conditions
+         // where the dissociation of the managed connection is "random".
+         lock = new ReentrantLock();
       }
    }
 
@@ -241,6 +250,35 @@
       }
    }
 
+   protected void lock()
+   {
+      lock.lock();
+   }
+
+   protected void tryLock() throws SQLException
+   {
+      int tryLock = mcf.getUseTryLock();
+      if (tryLock <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+            throw new SQLException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+      }
+      catch (InterruptedException e)
+      {
+         throw new SQLException("Interrupted attempting lock: " + this);
+      }
+   }
+   
+   protected void unlock()
+   {
+      lock.unlock();
+   }
+   
    void closeHandle(WrappedConnection handle)
    {
       synchronized (stateLock)

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -121,6 +121,9 @@
    protected int queryTimeout = 0;
 
    private boolean validateOnMatch;
+
+   /** Whether to use a try lock */
+   private int useTryLock = 0;
    
    public BaseWrapperManagedConnectionFactory ()
    {
@@ -330,6 +333,26 @@
       queryTimeout = timeout;
    }
  
+   /**
+    * Get the useTryLock.
+    * 
+    * @return the useTryLock.
+    */
+   public int getUseTryLock()
+   {
+      return useTryLock;
+   }
+
+   /**
+    * Set the useTryLock.
+    * 
+    * @param useTryLock the useTryLock.
+    */
+   public void setUseTryLock(int useTryLock)
+   {
+      this.useTryLock = useTryLock;
+   }
+
    public Set getInvalidConnections(final Set connectionSet) throws ResourceException
    {
       

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -34,6 +34,8 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import javax.resource.spi.ManagedConnection;
+
 import org.jboss.logging.Logger;
 import org.jboss.util.NestedSQLException;
 
@@ -72,6 +74,24 @@
          trackStatements = mc.getTrackStatements();
    }
 
+   protected void lock() throws SQLException
+   {
+      BaseWrapperManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.tryLock();
+      else
+         throw new SQLException("Connection is not associated with a managed connection." + this);
+   }
+
+   protected void unlock()
+   {
+      BaseWrapperManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.unlock();
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+   
    public WrapperDataSource getDataSource()
    {
       return dataSource;
@@ -84,8 +104,16 @@
    
    public void setReadOnly(boolean readOnly) throws SQLException
    {
-      checkStatus();
-      mc.setJdbcReadOnly(readOnly);
+      lock();
+      try
+      {
+         checkStatus();
+         mc.setJdbcReadOnly(readOnly);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public boolean isReadOnly() throws SQLException
@@ -139,384 +167,645 @@
 
    public Statement createStatement() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedStatement(this, mc.getConnection().createStatement());
+         checkTransaction();
+         try
+         {
+            return new WrappedStatement(this, mc.getConnection().createStatement());
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedStatement(this, mc.getConnection().createStatement(resultSetType, resultSetConcurrency));
+         checkTransaction();
+         try
+         {
+            return new WrappedStatement(this, mc.getConnection().createStatement(resultSetType, resultSetConcurrency));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
          throws SQLException
    {
-
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedStatement(this, mc.getConnection()
-               .createStatement(resultSetType, resultSetConcurrency, resultSetHoldability));
+         checkTransaction();
+         try
+         {
+            return new WrappedStatement(this, mc.getConnection()
+                  .createStatement(resultSetType, resultSetConcurrency, resultSetHoldability));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
          throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.prepareStatement(sql, resultSetType, resultSetConcurrency));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.prepareStatement(sql, resultSetType, resultSetConcurrency));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
          int resultSetHoldability) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.getConnection()
-               .prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.getConnection()
+                  .prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, autoGeneratedKeys));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, autoGeneratedKeys));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnIndexes));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnIndexes));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException
    {
-
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnNames));
+         checkTransaction();
+         try
+         {
+            return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnNames));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public CallableStatement prepareCall(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedCallableStatement(this, mc.prepareCall(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+         checkTransaction();
+         try
+         {
+            return new WrappedCallableStatement(this, mc.prepareCall(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedCallableStatement(this, mc.prepareCall(sql, resultSetType, resultSetConcurrency));
+         checkTransaction();
+         try
+         {
+            return new WrappedCallableStatement(this, mc.prepareCall(sql, resultSetType, resultSetConcurrency));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
          int resultSetHoldability) throws SQLException
    {
-
-      checkTransaction();
+      lock();
       try
       {
-         return new WrappedCallableStatement(this, mc.getConnection()
-               .prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+         checkTransaction();
+         try
+         {
+            return new WrappedCallableStatement(this, mc.getConnection()
+                  .prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public String nativeSQL(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().nativeSQL(sql);
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().nativeSQL(sql);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void setAutoCommit(boolean autocommit) throws SQLException
    {
-      checkStatus();
-      mc.setJdbcAutoCommit(autocommit);
+      lock();
+      try
+      {
+         checkStatus();
+         mc.setJdbcAutoCommit(autocommit);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public boolean getAutoCommit() throws SQLException
    {
-      checkStatus();
-      return mc.isJdbcAutoCommit();
+      lock();
+      try
+      {
+         checkStatus();
+         return mc.isJdbcAutoCommit();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void commit() throws SQLException
    {
-      checkTransaction();
-      mc.jdbcCommit();
+      lock();
+      try
+      {
+         checkTransaction();
+         mc.jdbcCommit();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback() throws SQLException
    {
-      checkTransaction();
-      mc.jdbcRollback();
+      lock();
+      try
+      {
+         checkTransaction();
+         mc.jdbcRollback();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback(Savepoint savepoint) throws SQLException
    {
-      checkTransaction();
-      mc.jdbcRollback(savepoint);
+      lock();
+      try
+      {
+         checkTransaction();
+         mc.jdbcRollback(savepoint);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public DatabaseMetaData getMetaData() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().getMetaData();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().getMetaData();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void setCatalog(String catalog) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         mc.getConnection().setCatalog(catalog);
+         checkTransaction();
+         try
+         {
+            mc.getConnection().setCatalog(catalog);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
    public String getCatalog() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().getCatalog();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().getCatalog();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void setTransactionIsolation(int isolationLevel) throws SQLException
    {
-      checkStatus();
-      mc.setJdbcTransactionIsolation(isolationLevel);
+      lock();
+      try
+      {
+         checkStatus();
+         mc.setJdbcTransactionIsolation(isolationLevel);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public int getTransactionIsolation() throws SQLException
    {
-      checkStatus();
-      return mc.getJdbcTransactionIsolation();
+      lock();
+      try
+      {
+         checkStatus();
+         return mc.getJdbcTransactionIsolation();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public SQLWarning getWarnings() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().getWarnings();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().getWarnings();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void clearWarnings() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         mc.getConnection().clearWarnings();
+         checkTransaction();
+         try
+         {
+            mc.getConnection().clearWarnings();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Map getTypeMap() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().getTypeMap();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().getTypeMap();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void setTypeMap(Map typeMap) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         mc.getConnection().setTypeMap(typeMap);
+         checkTransaction();
+         try
+         {
+            mc.getConnection().setTypeMap(typeMap);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void setHoldability(int holdability) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         mc.getConnection().setHoldability(holdability);
+         checkTransaction();
+         try
+         {
+            mc.getConnection().setHoldability(holdability);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int getHoldability() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().getHoldability();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().getHoldability();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Savepoint setSavepoint() throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().setSavepoint();
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().setSavepoint();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Savepoint setSavepoint(String name) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         return mc.getConnection().setSavepoint(name);
+         checkTransaction();
+         try
+         {
+            return mc.getConnection().setSavepoint(name);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void releaseSavepoint(Savepoint savepoint) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         mc.getConnection().releaseSavepoint(savepoint);
+         checkTransaction();
+         try
+         {
+            mc.getConnection().releaseSavepoint(savepoint);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public Connection getUnderlyingConnection() throws SQLException
    {
-      checkTransaction();
-      return mc.getConnection();
+      lock();
+      try
+      {
+         checkTransaction();
+         return mc.getConnection();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    void checkTransaction() throws SQLException

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,14 +59,22 @@
 
    public Statement getUnderlyingStatement() throws SQLException
    {
-      checkState();
-      if (ps instanceof CachedPreparedStatement)
+      lock();
+      try
       {
-         return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
+         checkTransaction();
+         if (ps instanceof CachedPreparedStatement)
+         {
+            return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
+         }
+         else
+         {
+            return ps;
+         }
       }
-      else
+      finally
       {
-         return ps;
+         unlock();
       }
    }
 
@@ -202,15 +210,23 @@
 
    public boolean execute() throws SQLException
    {
-      checkTransaction();
-      try 
+      lock();
+      try
       {
-         checkConfiguredQueryTimeout();
-         return ps.execute();         
+         checkTransaction();
+         try 
+         {
+            checkConfiguredQueryTimeout();
+            return ps.execute();         
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
@@ -229,43 +245,67 @@
 
    public ResultSet executeQuery() throws SQLException
    {
-      checkTransaction();
-      try 
+      lock();
+      try
       {
-         checkConfiguredQueryTimeout();
-         ResultSet resultSet = ps.executeQuery();
-         return registerResultSet(resultSet);
+         checkTransaction();
+         try 
+         {
+            checkConfiguredQueryTimeout();
+            ResultSet resultSet = ps.executeQuery();
+            return registerResultSet(resultSet);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int executeUpdate() throws SQLException
    {
-      checkTransaction();
-      try 
+      lock();
+      try
       {
-         checkConfiguredQueryTimeout();
-         return ps.executeUpdate();         
+         checkTransaction();
+         try 
+         {
+            checkConfiguredQueryTimeout();
+            return ps.executeUpdate();         
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void addBatch() throws SQLException
    {
-      checkState();
-      try 
+      lock();
+      try
       {
-         ps.addBatch();         
+         checkTransaction();
+         try 
+         {
+            ps.addBatch();         
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -102,8 +102,16 @@
 
    public ResultSet getUnderlyingResultSet() throws SQLException
    {
-      checkState();
-      return resultSet;
+      statement.lock();
+      try
+      {
+         checkTransaction();
+         return resultSet;
+      }
+      finally
+      {
+         statement.unlock();
+      }
    }
 
    public boolean absolute(int row) throws SQLException
@@ -186,14 +194,22 @@
 
    public void deleteRow() throws SQLException
    {
-      checkState();
+      statement.lock();
       try
       {
-         resultSet.deleteRow();
+         checkTransaction();
+         try
+         {
+            resultSet.deleteRow();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         statement.unlock();
       }
    }
 
@@ -1074,14 +1090,22 @@
 
    public void insertRow() throws SQLException
    {
-      checkState();
+      statement.lock();
       try
       {
-         resultSet.insertRow();
+         checkTransaction();
+         try
+         {
+            resultSet.insertRow();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         statement.unlock();
       }
    }
 
@@ -1789,14 +1813,22 @@
 
    public void updateRow() throws SQLException
    {
-      checkState();
+      statement.lock();
       try
       {
-         resultSet.updateRow();
+         checkTransaction();
+         try
+         {
+            resultSet.updateRow();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         statement.unlock();
       }
    }
 
@@ -1938,6 +1970,11 @@
          if (closed)
             throw new SQLException("The result set is closed.");
       }
+   }
+
+   void checkTransaction() throws SQLException
+   {
+      checkState();
       statement.checkTransactionActive();
    }
 }
\ No newline at end of file

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -62,6 +62,16 @@
       lc.registerStatement(this);
    }
 
+   protected void lock() throws SQLException
+   {
+      lc.lock();
+   }
+
+   protected void unlock()
+   {
+      lc.unlock();
+   }
+   
    public void close() throws SQLException
    {
       synchronized (lock)
@@ -77,57 +87,89 @@
 
    public boolean execute(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.execute(sql);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.execute(sql);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.execute(sql, autoGeneratedKeys);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.execute(sql, autoGeneratedKeys);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public boolean execute(String sql, int[] columnIndexes) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.execute(sql, columnIndexes);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.execute(sql, columnIndexes);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public boolean execute(String sql, String[]columnNames ) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.execute(sql, columnNames);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.execute(sql, columnNames);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
@@ -164,72 +206,112 @@
 
    public ResultSet executeQuery(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         ResultSet result = s.executeQuery(sql);
-         return registerResultSet(result);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            ResultSet result = s.executeQuery(sql);
+            return registerResultSet(result);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int executeUpdate(String sql) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.executeUpdate(sql);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.executeUpdate(sql);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.executeUpdate(sql, autoGeneratedKeys);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.executeUpdate(sql, autoGeneratedKeys);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.executeUpdate(sql, columnIndexes);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.executeUpdate(sql, columnIndexes);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int executeUpdate(String sql, String[] columnNames) throws SQLException
    {
-      checkTransaction();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.executeUpdate(sql, columnNames);
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.executeUpdate(sql, columnNames);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
@@ -486,55 +568,87 @@
 
    public void addBatch(String sql) throws SQLException
    {
-      checkState();
+      lock();
       try
       {
-         s.addBatch(sql);
+         checkTransaction();
+         try
+         {
+            s.addBatch(sql);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public void clearBatch() throws SQLException
    {
-      checkState();
+      lock();
       try
       {
-         s.clearBatch();
+         checkTransaction();
+         try
+         {
+            s.clearBatch();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public int[] executeBatch() throws SQLException
    {
-      checkState();
+      lock();
       try
       {
-         checkConfiguredQueryTimeout();
-         return s.executeBatch();
+         checkTransaction();
+         try
+         {
+            checkConfiguredQueryTimeout();
+            return s.executeBatch();
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
    public ResultSet getGeneratedKeys() throws SQLException
    {
-      checkState();
+      lock();
       try
       {
-         ResultSet resultSet = s.getGeneratedKeys();
-         return registerResultSet(resultSet);
+         checkTransaction();
+         try
+         {
+            ResultSet resultSet = s.getGeneratedKeys();
+            return registerResultSet(resultSet);
+         }
+         catch (Throwable t)
+         {
+            throw checkException(t);
+         }
       }
-      catch (Throwable t)
+      finally
       {
-         throw checkException(t);
+         unlock();
       }
    }
 
@@ -553,8 +667,16 @@
 
    public Statement getUnderlyingStatement() throws SQLException
    {
-      checkState();
-      return s;
+      lock();
+      try
+      {
+         checkTransaction();
+         return s;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    protected SQLException checkException(Throwable t)

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,30 +59,36 @@
 
    public void commit() throws ResourceException
    {
-      synchronized (stateLock)
-      {
-         if (inManagedTransaction)
-            inManagedTransaction = false;
-      }
+      lock();
       try
       {
+         synchronized (stateLock)
+         {
+            if (inManagedTransaction)
+               inManagedTransaction = false;
+         }
          con.commit();
       }
       catch (SQLException e)
       {
          checkException(e);
       }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback() throws ResourceException
    {
-      synchronized (stateLock)
-      {
-         if (inManagedTransaction)
-            inManagedTransaction = false;
-      }
+      lock();
       try
       {
+         synchronized (stateLock)
+         {
+            if (inManagedTransaction)
+               inManagedTransaction = false;
+         }
          con.rollback();
       }
       catch (SQLException e)
@@ -95,32 +101,44 @@
          {
          }
       }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void begin() throws ResourceException
    {
-      synchronized (stateLock)
+      lock();
+      try
       {
-         if (inManagedTransaction == false)
+         synchronized (stateLock)
          {
-            try
+            if (inManagedTransaction == false)
             {
-               if (underlyingAutoCommit)
+               try
                {
-                  underlyingAutoCommit = false;
-                  con.setAutoCommit(false);
+                  if (underlyingAutoCommit)
+                  {
+                     underlyingAutoCommit = false;
+                     con.setAutoCommit(false);
+                  }
+                  checkState();
+                  inManagedTransaction = true;
                }
-               checkState();
-               inManagedTransaction = true;
+               catch (SQLException e)
+               {
+                  checkException(e);
+               }
             }
-            catch (SQLException e)
-            {
-               checkException(e);
-            }
+            else
+               throw new JBossResourceException("Trying to begin a nested local tx");
          }
-         else
-            throw new JBossResourceException("Trying to begin a nested local tx");
       }
+      finally
+      {
+         unlock();
+      }
    }
 
    Properties getProps()

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -108,81 +108,129 @@
 
    public void start(Xid xid, int flags) throws XAException
    {
+      lock();
       try
       {
-         checkState();
-      }
-      catch (SQLException e)
-      {
-         getLog().warn("Error setting state ", e);
-      }
-      try
-      {
-         xaResource.start(xid, flags);
+         try
+         {
+            checkState();
+         }
+         catch (SQLException e)
+         {
+            getLog().warn("Error setting state ", e);
+         }
+         try
+         {
+            xaResource.start(xid, flags);
+            
+         }catch(XAException e)
+         {
+            //JBAS-3336 Connections that fail in enlistment should not be returned
+            //to the pool
+            if(isFailedXA(e.errorCode))
+            {
+               getLog().error("Start transaction failed for " + this);
+               broadcastConnectionError(e);  
+            }
+            
+            throw e;
+         }
          
-      }catch(XAException e)
-      {
-         //JBAS-3336 Connections that fail in enlistment should not be returned
-         //to the pool
-         if(isFailedXA(e.errorCode))
+         synchronized (stateLock)
          {
-            getLog().error("Start transaction failed for " + this);
-            broadcastConnectionError(e);  
+            currentXid = xid;
+            inManagedTransaction = true;
          }
-         
-         throw e;
       }
-      
-      synchronized (stateLock)
+      finally
       {
-         currentXid = xid;
-         inManagedTransaction = true;
+         unlock();
       }
    }
 
    public void end(Xid xid, int flags) throws XAException
    {
+      lock();
       try
       {
-         xaResource.end(xid, flags);
-         
-      }catch(XAException e)
-      {
-         getLog().error("End transaction failed for XAResource", e);         
-         broadcastConnectionError(e);
-         throw e;
-      }
+         try
+         {
+            xaResource.end(xid, flags);
+         }
+         catch(XAException e)
+         {
+            getLog().error("End transaction failed for XAResource", e);         
+            broadcastConnectionError(e);
+            throw e;
+         }
 
-      //we want to allow ending transactions that are not the current
-      //one. When one does this, inManagedTransaction is still true.
-      synchronized (stateLock)
-      {
-         if (currentXid != null && currentXid.equals(xid))
+         //we want to allow ending transactions that are not the current
+         //one. When one does this, inManagedTransaction is still true.
+         synchronized (stateLock)
          {
-            inManagedTransaction = false;
-            currentXid = null;
+            if (currentXid != null && currentXid.equals(xid))
+            {
+               inManagedTransaction = false;
+               currentXid = null;
+            }
          }
       }
+      finally
+      {
+         unlock();
+      }
    }
 
    public int prepare(Xid xid) throws XAException
    {
-      return xaResource.prepare(xid);
+      lock();
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void commit(Xid xid, boolean onePhase) throws XAException
    {
-      xaResource.commit(xid, onePhase);
+      lock();
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback(Xid xid) throws XAException
    {
-      xaResource.rollback(xid);
+      lock();
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void forget(Xid xid) throws XAException
    {
-      xaResource.forget(xid);
+      lock();
+      try
+      {
+         xaResource.forget(xid);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public Xid[] recover(int flag) throws XAException
@@ -222,66 +270,90 @@
 
    public void begin() throws ResourceException 
    {
-      synchronized (stateLock)
+      lock();
+      try
       {
-         if (inManagedTransaction == false)
+         synchronized (stateLock)
          {
-            try
+            if (inManagedTransaction == false)
             {
-               if (underlyingAutoCommit)
+               try
                {
-                  underlyingAutoCommit = false;
-                  con.setAutoCommit(false);
+                  if (underlyingAutoCommit)
+                  {
+                     underlyingAutoCommit = false;
+                     con.setAutoCommit(false);
+                  }
+                  checkState();
+                  inManagedTransaction = true;
                }
-               checkState();
-               inManagedTransaction = true;
+               catch (SQLException e)
+               {
+                  checkException(e);
+               }
             }
-            catch (SQLException e)
-            {
-               checkException(e);
-            }
+            else
+               throw new JBossResourceException("Trying to begin a nested local tx");
          }
-         else
-            throw new JBossResourceException("Trying to begin a nested local tx");
       }
+      finally
+      {
+         unlock();
+      }
    }
    public void commit() throws ResourceException
    {
-      synchronized (stateLock)
-      {
-         if (inManagedTransaction)
-            inManagedTransaction = false;
-      }
+      lock();
       try
       {
-         con.commit();
+         synchronized (stateLock)
+         {
+            if (inManagedTransaction)
+               inManagedTransaction = false;
+         }
+         try
+         {
+            con.commit();
+         }
+         catch (SQLException e)
+         {
+            checkException(e);
+         }
       }
-      catch (SQLException e)
+      finally
       {
-         checkException(e);
+         unlock();
       }
    }
    public void rollback() throws ResourceException
    {
-      synchronized (stateLock)
-      {
-         if (inManagedTransaction)
-            inManagedTransaction = false;
-      }
+      lock();
       try
       {
-         con.rollback();
-      }
-      catch (SQLException e)
-      {
+         synchronized (stateLock)
+         {
+            if (inManagedTransaction)
+               inManagedTransaction = false;
+         }
          try
          {
-            checkException(e);
+            con.rollback();
          }
-         catch (Exception e2)
+         catch (SQLException e)
          {
+            try
+            {
+               checkException(e);
+            }
+            catch (Exception e2)
+            {
+            }
          }
       }
+      finally
+      {
+         unlock();
+      }
    }
 
 }

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -49,6 +49,7 @@
 
 	public void commit() throws ResourceException
 	{
+	    mc.lock();
 		try
 		{
 			if (mc.getSession().getTransacted())
@@ -58,10 +59,15 @@
 		{
 			throw new JBossResourceException("Could not commit LocalTransaction", e);
 		}
+		finally
+		{
+		   mc.unlock();
+		}
 	}
 
 	public void rollback() throws ResourceException
 	{
+	    mc.lock();
 		try
 		{
 			if (mc.getSession().getTransacted())
@@ -71,5 +77,9 @@
 		{
 			throw new JBossResourceException("Could not rollback LocalTransaction", ex);
 		}
+		finally
+		{
+		   mc.unlock();
+		}
 	}
 }

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -27,12 +27,15 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
@@ -141,6 +144,8 @@
    private String pwd;
    private boolean isDestroyed;
 
+   private ReentrantLock lock = new ReentrantLock();
+   
    // Physical JMS connection stuff
    private Connection con;
    private Session session;
@@ -332,6 +337,12 @@
 
       // destory handles
       destroyHandles();
+
+      // I'm recreating the lock object when we return to the pool
+      // because it looks too nasty to expect the connection handle
+      // to unlock properly in certain race conditions
+      // where the dissociation of the managed connection is "random".
+      lock = new ReentrantLock();
    }
 
    /**
@@ -360,6 +371,35 @@
             ("ManagedConnection in an illegal state");
    }
 
+   protected void lock()
+   {
+      lock.lock();
+   }
+
+   protected void tryLock() throws JMSException
+   {
+      int tryLock = mcf.getUseTryLock();
+      if (tryLock <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+            throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+      }
+      catch (InterruptedException e)
+      {
+         throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+      }
+   }
+   
+   protected void unlock()
+   {
+      lock.unlock();
+   }
+
    /**
     * Add a connection event listener.
     *
@@ -412,6 +452,7 @@
       if (log.isTraceEnabled())
          log.trace("XAResource=" + xaResource);
 
+      xaResource = new JmsXAResource(this, xaResource);
       return xaResource;
    }
 

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -57,6 +57,9 @@
 
    /** For local access. */
    private JMSProviderAdapter adapter;
+   
+   /** The try lock */
+   private int useTryLock = 0;
 
    public JmsManagedConnectionFactory()
    {
@@ -313,7 +316,27 @@
    {
       return adapter;
    }
+   
+   /**
+    * Get the useTryLock.
+    * 
+    * @return the useTryLock.
+    */
+   public int getUseTryLock()
+   {
+      return useTryLock;
+   }
 
+   /**
+    * Set the useTryLock.
+    * 
+    * @param useTryLock the useTryLock.
+    */
+   public void setUseTryLock(int useTryLock)
+   {
+      this.useTryLock = useTryLock;
+   }
+
    private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
    {
       if (info == null)

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -101,54 +101,86 @@
    
    public Message receive() throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this);
-      checkState();
-      Message message = consumer.receive();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receive " + this);
+         checkState();
+         Message message = consumer.receive();
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public Message receive(long timeout) throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this + " timeout=" + timeout);
-      checkState();
-      Message message = consumer.receive(timeout);
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receive " + this + " timeout=" + timeout);
+         checkState();
+         Message message = consumer.receive(timeout);
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public Message receiveNoWait() throws JMSException
    {
-      if (trace)
-         log.trace("receiveNoWait " + this);
-      checkState();
-      Message message = consumer.receiveNoWait();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receiveNoWait " + this);
+         checkState();
+         Message message = consumer.receiveNoWait();
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
    
    public void setMessageListener(MessageListener listener) throws JMSException
    {
-      checkState();
-      session.checkStrict();
-      if (listener == null)
-         consumer.setMessageListener(null);
-      else
-         consumer.setMessageListener(wrapMessageListener(listener));
+      session.lock();
+      try
+      {
+         checkState();
+         session.checkStrict();
+         if (listener == null)
+            consumer.setMessageListener(null);
+         else
+            consumer.setMessageListener(wrapMessageListener(listener));
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    void closeConsumer() throws JMSException

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -79,42 +79,74 @@
    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
          throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
-      checkState();
-      producer.send(destination, message, deliveryMode, priority, timeToLive);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void send(Destination destination, Message message) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message);
-      checkState();
-      producer.send(destination, message);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
-      checkState();
-      producer.send(message, deliveryMode, priority, timeToLive);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void send(Message message) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " message=" + message);
-      checkState();
-      producer.send(message);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message);
+         checkState();
+         producer.send(message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public int getDeliveryMode() throws JMSException

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,21 +59,37 @@
 
    public void send(Queue destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
-      checkState();
-      producer.send(destination, message, deliveryMode, priority, timeToLive);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void send(Queue destination, Message message) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message);
-      checkState();
-      producer.send(destination, message);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 }

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -22,6 +22,7 @@
 package org.jboss.resource.adapter.jms;
 
 import java.io.Serializable;
+import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.Iterator;
 
@@ -41,6 +42,7 @@
 import javax.jms.QueueReceiver;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
@@ -102,6 +104,24 @@
       this.sf = sf;
    }
    
+   protected void lock() throws JMSException
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.tryLock();
+      else
+         throw new ResourceAllocationException("Connection is not associated with a managed connection. " + this);
+   }
+
+   protected void unlock()
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.unlock();
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+   
    /**
     * Ensure that the session is opened.
     * 
@@ -248,32 +268,56 @@
    // FIXME - is this really OK, probably not
    public void commit() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Commit session " + this);
-      session.commit();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted() == false)
+            throw new IllegalStateException("Session is not transacted");
+         if (trace)
+            log.trace("Commit session " + this);
+         session.commit();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Rollback session " + this);
-      session.rollback();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted() == false)
+            throw new IllegalStateException("Session is not transacted");
+         if (trace)
+            log.trace("Rollback session " + this);
+         session.rollback();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void recover() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted())
-         throw new IllegalStateException("Session is transacted");
-      if (trace)
-         log.trace("Recover session " + this);
-      session.recover();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted())
+            throw new IllegalStateException("Session is transacted");
+         if (trace)
+            log.trace("Recover session " + this);
+         session.recover();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    // --- TopicSession API
@@ -296,28 +340,44 @@
 
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic);
-      TopicSubscriber result = session.createSubscriber(topic);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createSubscriber " + session + " topic=" + topic);
+         TopicSubscriber result = session.createSubscriber(topic);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
+         TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -327,42 +387,66 @@
          throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");         
       }
       
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
+         TopicSubscriber result = session.createDurableSubscriber(topic, name);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
+         TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicPublisher createPublisher(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createPublisher " + session + " topic=" + topic);
-      TopicPublisher result = session.createPublisher(topic);
-      result = new JmsTopicPublisher(result, this);
-      if (trace)
-         log.trace("createdPublisher " + session + " publisher=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createPublisher " + session + " topic=" + topic);
+         TopicPublisher result = session.createPublisher(topic);
+         result = new JmsTopicPublisher(result, this);
+         if (trace)
+            log.trace("createdPublisher " + session + " publisher=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -372,14 +456,22 @@
          throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");         
       }
       
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryTopic " + session);
-      TemporaryTopic temp = session.createTemporaryTopic();
-      if (trace)
-         log.trace("createdTemporaryTopic " + session + " temp=" + temp);
-      sf.addTemporaryTopic(temp);
-      return temp;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createTemporaryTopic " + session);
+         TemporaryTopic temp = session.createTemporaryTopic();
+         if (trace)
+            log.trace("createdTemporaryTopic " + session + " temp=" + temp);
+         sf.addTemporaryTopic(temp);
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void unsubscribe(String name) throws JMSException
@@ -389,10 +481,18 @@
          throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");         
       }
 
-      Session session = getSession();
-      if (trace)
-         log.trace("unsubscribe " + session + " name=" + name);
-      session.unsubscribe(name);
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("unsubscribe " + session + " name=" + name);
+         session.unsubscribe(name);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    //--- QueueSession API
@@ -445,41 +545,65 @@
 
    public QueueReceiver createReceiver(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue);
-      QueueReceiver result = session.createReceiver(queue);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createReceiver " + session + " queue=" + queue);
+         QueueReceiver result = session.createReceiver(queue);
+         result = new JmsQueueReceiver(result, this);
+         if (trace)
+            log.trace("createdReceiver " + session + " receiver=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
-      QueueReceiver result = session.createReceiver(queue, messageSelector);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
+         QueueReceiver result = session.createReceiver(queue, messageSelector);
+         result = new JmsQueueReceiver(result, this);
+         if (trace)
+            log.trace("createdReceiver " + session + " receiver=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public QueueSender createSender(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createSender " + session + " queue=" + queue);
-      QueueSender result = session.createSender(queue);
-      result = new JmsQueueSender(result, this);
-      if (trace)
-         log.trace("createdSender " + session + " sender=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createSender " + session + " queue=" + queue);
+         QueueSender result = session.createSender(queue);
+         result = new JmsQueueSender(result, this);
+         if (trace)
+            log.trace("createdSender " + session + " sender=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -489,69 +613,110 @@
          throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
          
       }
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryQueue " + session);
-      TemporaryQueue temp = session.createTemporaryQueue();
-      if (trace)
-         log.trace("createdTemporaryQueue " + session + " temp=" + temp);
-      sf.addTemporaryQueue(temp);
-      return temp;
+
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createTemporaryQueue " + session);
+         TemporaryQueue temp = session.createTemporaryQueue();
+         if (trace)
+            log.trace("createdTemporaryQueue " + session + " temp=" + temp);
+         sf.addTemporaryQueue(temp);
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    // -- JMS 1.1
 
    public MessageConsumer createConsumer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination);
-      MessageConsumer result = session.createConsumer(destination);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination);
+         MessageConsumer result = session.createConsumer(destination);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
-      MessageConsumer result = session.createConsumer(destination, messageSelector);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
+         MessageConsumer result = session.createConsumer(destination, messageSelector);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
-      MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
+         MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageProducer createProducer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createProducer " + session + " dest=" + destination);
-      MessageProducer result = getSession().createProducer(destination);
-      result = new JmsMessageProducer(result, this);
-      if (trace)
-         log.trace("createdProducer " + session + " producer=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createProducer " + session + " dest=" + destination);
+         MessageProducer result = getSession().createProducer(destination);
+         result = new JmsMessageProducer(result, this);
+         if (trace)
+            log.trace("createdProducer " + session + " producer=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public int getAcknowledgeMode() throws JMSException

Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java	2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,6 +59,14 @@
 
    public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
    {
+      session.lock();
+      try
+      {
+      }
+      finally
+      {
+         session.unlock();
+      }
       if (trace)
          log.trace("send " + this  + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
       checkState();
@@ -69,32 +77,56 @@
 
    public void publish(Message message) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " message=" + message);
-      checkState();
-      ((TopicPublisher) producer).publish(message);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message);
+         checkState();
+         ((TopicPublisher) producer).publish(message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void publish(Topic destination, Message message, int deliveryMode, int priority, long timeToLive)
          throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
-      checkState();
-      ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public void publish(Topic destination, Message message) throws JMSException
    {
-      if (trace)
-         log.trace("send " + this + " destination=" + destination + " message=" + message);
-      checkState();
-      ((TopicPublisher) producer).publish(destination, message);
-      if (trace)
-         log.trace("sent " + this + " result=" + message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         ((TopicPublisher) producer).publish(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 }

Added: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	                        (rev 0)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	2007-12-20 16:54:27 UTC (rev 68459)
@@ -0,0 +1,153 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2007, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * JmsXAResource.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsXAResource implements XAResource
+{
+   /** The managed connection */
+   private JmsManagedConnection managedConnection;
+   
+   /** The resource */
+   private XAResource xaResource;
+
+   /**
+    * Create a new JmsXAResource.
+    * 
+    * @param managedConnection the managed connection
+    * @param xaResource the xa resource
+    */
+   public JmsXAResource(JmsManagedConnection managedConnection, XAResource xaResource)
+   {
+      this.managedConnection = managedConnection;
+      this.xaResource = xaResource;
+   }
+
+   public void start(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void end(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public int prepare(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void commit(Xid xid, boolean onePhase) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void rollback(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void forget(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.forget(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      return xaResource.isSameRM(xaRes);
+   }
+
+   public Xid[] recover(int flag) throws XAException
+   {
+      return xaResource.recover(flag);
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      return xaResource.getTransactionTimeout();
+   }
+
+   public boolean setTransactionTimeout(int seconds) throws XAException
+   {
+      return xaResource.setTransactionTimeout(seconds);
+   }
+   
+   
+}




More information about the jboss-cvs-commits mailing list