[jboss-cvs] JBossAS SVN: r68459 - in branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter: jdbc/local and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 20 11:54:28 EST 2007
Author: adrian at jboss.org
Date: 2007-12-20 11:54:27 -0500 (Thu, 20 Dec 2007)
New Revision: 68459
Added:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
Modified:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
Log:
[JBAS-5084] - Add a lock for the session to avoid racing between jms activity and asynchronous rollback
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -34,6 +34,8 @@
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
@@ -66,6 +68,8 @@
private final int transactionIsolation;
private final boolean readOnly;
+ private ReentrantLock lock = new ReentrantLock();
+
private final Collection cels = new ArrayList();
private final Set handles = new HashSet();
private PreparedStatementCache psCache = null;
@@ -194,6 +198,11 @@
mcf.log.warn("Error resetting transaction isolation ", e);
}
}
+ // I'm recreating the lock object when we return to the pool
+ // because it looks too nasty to expect the connection handle
+ // to unlock properly in certain race conditions
+ // where the dissociation of the managed connection is "random".
+ lock = new ReentrantLock();
}
}
@@ -241,6 +250,35 @@
}
}
+ protected void lock()
+ {
+ lock.lock();
+ }
+
+ protected void tryLock() throws SQLException
+ {
+ int tryLock = mcf.getUseTryLock();
+ if (tryLock <= 0)
+ {
+ lock();
+ return;
+ }
+ try
+ {
+ if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+ throw new SQLException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+ }
+ catch (InterruptedException e)
+ {
+ throw new SQLException("Interrupted attempting lock: " + this);
+ }
+ }
+
+ protected void unlock()
+ {
+ lock.unlock();
+ }
+
void closeHandle(WrappedConnection handle)
{
synchronized (stateLock)
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnectionFactory.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -121,6 +121,9 @@
protected int queryTimeout = 0;
private boolean validateOnMatch;
+
+ /** Whether to use a try lock */
+ private int useTryLock = 0;
public BaseWrapperManagedConnectionFactory ()
{
@@ -330,6 +333,26 @@
queryTimeout = timeout;
}
+ /**
+ * Get the useTryLock.
+ *
+ * @return the useTryLock.
+ */
+ public int getUseTryLock()
+ {
+ return useTryLock;
+ }
+
+ /**
+ * Set the useTryLock.
+ *
+ * @param useTryLock the useTryLock.
+ */
+ public void setUseTryLock(int useTryLock)
+ {
+ this.useTryLock = useTryLock;
+ }
+
public Set getInvalidConnections(final Set connectionSet) throws ResourceException
{
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedConnection.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -34,6 +34,8 @@
import java.util.Iterator;
import java.util.Map;
+import javax.resource.spi.ManagedConnection;
+
import org.jboss.logging.Logger;
import org.jboss.util.NestedSQLException;
@@ -72,6 +74,24 @@
trackStatements = mc.getTrackStatements();
}
+ protected void lock() throws SQLException
+ {
+ BaseWrapperManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.tryLock();
+ else
+ throw new SQLException("Connection is not associated with a managed connection." + this);
+ }
+
+ protected void unlock()
+ {
+ BaseWrapperManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.unlock();
+ // We recreate the lock when returned to the pool
+ // so missing the unlock after disassociation is not important
+ }
+
public WrapperDataSource getDataSource()
{
return dataSource;
@@ -84,8 +104,16 @@
public void setReadOnly(boolean readOnly) throws SQLException
{
- checkStatus();
- mc.setJdbcReadOnly(readOnly);
+ lock();
+ try
+ {
+ checkStatus();
+ mc.setJdbcReadOnly(readOnly);
+ }
+ finally
+ {
+ unlock();
+ }
}
public boolean isReadOnly() throws SQLException
@@ -139,384 +167,645 @@
public Statement createStatement() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedStatement(this, mc.getConnection().createStatement());
+ checkTransaction();
+ try
+ {
+ return new WrappedStatement(this, mc.getConnection().createStatement());
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedStatement(this, mc.getConnection().createStatement(resultSetType, resultSetConcurrency));
+ checkTransaction();
+ try
+ {
+ return new WrappedStatement(this, mc.getConnection().createStatement(resultSetType, resultSetConcurrency));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException
{
-
- checkTransaction();
+ lock();
try
{
- return new WrappedStatement(this, mc.getConnection()
- .createStatement(resultSetType, resultSetConcurrency, resultSetHoldability));
+ checkTransaction();
+ try
+ {
+ return new WrappedStatement(this, mc.getConnection()
+ .createStatement(resultSetType, resultSetConcurrency, resultSetHoldability));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.prepareStatement(sql, resultSetType, resultSetConcurrency));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.prepareStatement(sql, resultSetType, resultSetConcurrency));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.getConnection()
- .prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.getConnection()
+ .prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, autoGeneratedKeys));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, autoGeneratedKeys));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnIndexes));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnIndexes));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException
{
-
- checkTransaction();
+ lock();
try
{
- return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnNames));
+ checkTransaction();
+ try
+ {
+ return new WrappedPreparedStatement(this, mc.getConnection().prepareStatement(sql, columnNames));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public CallableStatement prepareCall(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedCallableStatement(this, mc.prepareCall(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+ checkTransaction();
+ try
+ {
+ return new WrappedCallableStatement(this, mc.prepareCall(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return new WrappedCallableStatement(this, mc.prepareCall(sql, resultSetType, resultSetConcurrency));
+ checkTransaction();
+ try
+ {
+ return new WrappedCallableStatement(this, mc.prepareCall(sql, resultSetType, resultSetConcurrency));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException
{
-
- checkTransaction();
+ lock();
try
{
- return new WrappedCallableStatement(this, mc.getConnection()
- .prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+ checkTransaction();
+ try
+ {
+ return new WrappedCallableStatement(this, mc.getConnection()
+ .prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability));
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public String nativeSQL(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().nativeSQL(sql);
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().nativeSQL(sql);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void setAutoCommit(boolean autocommit) throws SQLException
{
- checkStatus();
- mc.setJdbcAutoCommit(autocommit);
+ lock();
+ try
+ {
+ checkStatus();
+ mc.setJdbcAutoCommit(autocommit);
+ }
+ finally
+ {
+ unlock();
+ }
}
public boolean getAutoCommit() throws SQLException
{
- checkStatus();
- return mc.isJdbcAutoCommit();
+ lock();
+ try
+ {
+ checkStatus();
+ return mc.isJdbcAutoCommit();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void commit() throws SQLException
{
- checkTransaction();
- mc.jdbcCommit();
+ lock();
+ try
+ {
+ checkTransaction();
+ mc.jdbcCommit();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void rollback() throws SQLException
{
- checkTransaction();
- mc.jdbcRollback();
+ lock();
+ try
+ {
+ checkTransaction();
+ mc.jdbcRollback();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void rollback(Savepoint savepoint) throws SQLException
{
- checkTransaction();
- mc.jdbcRollback(savepoint);
+ lock();
+ try
+ {
+ checkTransaction();
+ mc.jdbcRollback(savepoint);
+ }
+ finally
+ {
+ unlock();
+ }
}
public DatabaseMetaData getMetaData() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().getMetaData();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().getMetaData();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void setCatalog(String catalog) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- mc.getConnection().setCatalog(catalog);
+ checkTransaction();
+ try
+ {
+ mc.getConnection().setCatalog(catalog);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public String getCatalog() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().getCatalog();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().getCatalog();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void setTransactionIsolation(int isolationLevel) throws SQLException
{
- checkStatus();
- mc.setJdbcTransactionIsolation(isolationLevel);
+ lock();
+ try
+ {
+ checkStatus();
+ mc.setJdbcTransactionIsolation(isolationLevel);
+ }
+ finally
+ {
+ unlock();
+ }
}
public int getTransactionIsolation() throws SQLException
{
- checkStatus();
- return mc.getJdbcTransactionIsolation();
+ lock();
+ try
+ {
+ checkStatus();
+ return mc.getJdbcTransactionIsolation();
+ }
+ finally
+ {
+ unlock();
+ }
}
public SQLWarning getWarnings() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().getWarnings();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().getWarnings();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void clearWarnings() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- mc.getConnection().clearWarnings();
+ checkTransaction();
+ try
+ {
+ mc.getConnection().clearWarnings();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Map getTypeMap() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().getTypeMap();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().getTypeMap();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void setTypeMap(Map typeMap) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- mc.getConnection().setTypeMap(typeMap);
+ checkTransaction();
+ try
+ {
+ mc.getConnection().setTypeMap(typeMap);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void setHoldability(int holdability) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- mc.getConnection().setHoldability(holdability);
+ checkTransaction();
+ try
+ {
+ mc.getConnection().setHoldability(holdability);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int getHoldability() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().getHoldability();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().getHoldability();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Savepoint setSavepoint() throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().setSavepoint();
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().setSavepoint();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Savepoint setSavepoint(String name) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- return mc.getConnection().setSavepoint(name);
+ checkTransaction();
+ try
+ {
+ return mc.getConnection().setSavepoint(name);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void releaseSavepoint(Savepoint savepoint) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- mc.getConnection().releaseSavepoint(savepoint);
+ checkTransaction();
+ try
+ {
+ mc.getConnection().releaseSavepoint(savepoint);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public Connection getUnderlyingConnection() throws SQLException
{
- checkTransaction();
- return mc.getConnection();
+ lock();
+ try
+ {
+ checkTransaction();
+ return mc.getConnection();
+ }
+ finally
+ {
+ unlock();
+ }
}
void checkTransaction() throws SQLException
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,14 +59,22 @@
public Statement getUnderlyingStatement() throws SQLException
{
- checkState();
- if (ps instanceof CachedPreparedStatement)
+ lock();
+ try
{
- return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
+ checkTransaction();
+ if (ps instanceof CachedPreparedStatement)
+ {
+ return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
+ }
+ else
+ {
+ return ps;
+ }
}
- else
+ finally
{
- return ps;
+ unlock();
}
}
@@ -202,15 +210,23 @@
public boolean execute() throws SQLException
{
- checkTransaction();
- try
+ lock();
+ try
{
- checkConfiguredQueryTimeout();
- return ps.execute();
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return ps.execute();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
@@ -229,43 +245,67 @@
public ResultSet executeQuery() throws SQLException
{
- checkTransaction();
- try
+ lock();
+ try
{
- checkConfiguredQueryTimeout();
- ResultSet resultSet = ps.executeQuery();
- return registerResultSet(resultSet);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ ResultSet resultSet = ps.executeQuery();
+ return registerResultSet(resultSet);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int executeUpdate() throws SQLException
{
- checkTransaction();
- try
+ lock();
+ try
{
- checkConfiguredQueryTimeout();
- return ps.executeUpdate();
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return ps.executeUpdate();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void addBatch() throws SQLException
{
- checkState();
- try
+ lock();
+ try
{
- ps.addBatch();
+ checkTransaction();
+ try
+ {
+ ps.addBatch();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedResultSet.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -102,8 +102,16 @@
public ResultSet getUnderlyingResultSet() throws SQLException
{
- checkState();
- return resultSet;
+ statement.lock();
+ try
+ {
+ checkTransaction();
+ return resultSet;
+ }
+ finally
+ {
+ statement.unlock();
+ }
}
public boolean absolute(int row) throws SQLException
@@ -186,14 +194,22 @@
public void deleteRow() throws SQLException
{
- checkState();
+ statement.lock();
try
{
- resultSet.deleteRow();
+ checkTransaction();
+ try
+ {
+ resultSet.deleteRow();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ statement.unlock();
}
}
@@ -1074,14 +1090,22 @@
public void insertRow() throws SQLException
{
- checkState();
+ statement.lock();
try
{
- resultSet.insertRow();
+ checkTransaction();
+ try
+ {
+ resultSet.insertRow();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ statement.unlock();
}
}
@@ -1789,14 +1813,22 @@
public void updateRow() throws SQLException
{
- checkState();
+ statement.lock();
try
{
- resultSet.updateRow();
+ checkTransaction();
+ try
+ {
+ resultSet.updateRow();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ statement.unlock();
}
}
@@ -1938,6 +1970,11 @@
if (closed)
throw new SQLException("The result set is closed.");
}
+ }
+
+ void checkTransaction() throws SQLException
+ {
+ checkState();
statement.checkTransactionActive();
}
}
\ No newline at end of file
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedStatement.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -62,6 +62,16 @@
lc.registerStatement(this);
}
+ protected void lock() throws SQLException
+ {
+ lc.lock();
+ }
+
+ protected void unlock()
+ {
+ lc.unlock();
+ }
+
public void close() throws SQLException
{
synchronized (lock)
@@ -77,57 +87,89 @@
public boolean execute(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.execute(sql);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.execute(sql);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.execute(sql, autoGeneratedKeys);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.execute(sql, autoGeneratedKeys);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public boolean execute(String sql, int[] columnIndexes) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.execute(sql, columnIndexes);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.execute(sql, columnIndexes);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public boolean execute(String sql, String[]columnNames ) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.execute(sql, columnNames);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.execute(sql, columnNames);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
@@ -164,72 +206,112 @@
public ResultSet executeQuery(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- ResultSet result = s.executeQuery(sql);
- return registerResultSet(result);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ ResultSet result = s.executeQuery(sql);
+ return registerResultSet(result);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int executeUpdate(String sql) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.executeUpdate(sql);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.executeUpdate(sql);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.executeUpdate(sql, autoGeneratedKeys);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.executeUpdate(sql, autoGeneratedKeys);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.executeUpdate(sql, columnIndexes);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.executeUpdate(sql, columnIndexes);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int executeUpdate(String sql, String[] columnNames) throws SQLException
{
- checkTransaction();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.executeUpdate(sql, columnNames);
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.executeUpdate(sql, columnNames);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
@@ -486,55 +568,87 @@
public void addBatch(String sql) throws SQLException
{
- checkState();
+ lock();
try
{
- s.addBatch(sql);
+ checkTransaction();
+ try
+ {
+ s.addBatch(sql);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public void clearBatch() throws SQLException
{
- checkState();
+ lock();
try
{
- s.clearBatch();
+ checkTransaction();
+ try
+ {
+ s.clearBatch();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public int[] executeBatch() throws SQLException
{
- checkState();
+ lock();
try
{
- checkConfiguredQueryTimeout();
- return s.executeBatch();
+ checkTransaction();
+ try
+ {
+ checkConfiguredQueryTimeout();
+ return s.executeBatch();
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
public ResultSet getGeneratedKeys() throws SQLException
{
- checkState();
+ lock();
try
{
- ResultSet resultSet = s.getGeneratedKeys();
- return registerResultSet(resultSet);
+ checkTransaction();
+ try
+ {
+ ResultSet resultSet = s.getGeneratedKeys();
+ return registerResultSet(resultSet);
+ }
+ catch (Throwable t)
+ {
+ throw checkException(t);
+ }
}
- catch (Throwable t)
+ finally
{
- throw checkException(t);
+ unlock();
}
}
@@ -553,8 +667,16 @@
public Statement getUnderlyingStatement() throws SQLException
{
- checkState();
- return s;
+ lock();
+ try
+ {
+ checkTransaction();
+ return s;
+ }
+ finally
+ {
+ unlock();
+ }
}
protected SQLException checkException(Throwable t)
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/local/LocalManagedConnection.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,30 +59,36 @@
public void commit() throws ResourceException
{
- synchronized (stateLock)
- {
- if (inManagedTransaction)
- inManagedTransaction = false;
- }
+ lock();
try
{
+ synchronized (stateLock)
+ {
+ if (inManagedTransaction)
+ inManagedTransaction = false;
+ }
con.commit();
}
catch (SQLException e)
{
checkException(e);
}
+ finally
+ {
+ unlock();
+ }
}
public void rollback() throws ResourceException
{
- synchronized (stateLock)
- {
- if (inManagedTransaction)
- inManagedTransaction = false;
- }
+ lock();
try
{
+ synchronized (stateLock)
+ {
+ if (inManagedTransaction)
+ inManagedTransaction = false;
+ }
con.rollback();
}
catch (SQLException e)
@@ -95,32 +101,44 @@
{
}
}
+ finally
+ {
+ unlock();
+ }
}
public void begin() throws ResourceException
{
- synchronized (stateLock)
+ lock();
+ try
{
- if (inManagedTransaction == false)
+ synchronized (stateLock)
{
- try
+ if (inManagedTransaction == false)
{
- if (underlyingAutoCommit)
+ try
{
- underlyingAutoCommit = false;
- con.setAutoCommit(false);
+ if (underlyingAutoCommit)
+ {
+ underlyingAutoCommit = false;
+ con.setAutoCommit(false);
+ }
+ checkState();
+ inManagedTransaction = true;
}
- checkState();
- inManagedTransaction = true;
+ catch (SQLException e)
+ {
+ checkException(e);
+ }
}
- catch (SQLException e)
- {
- checkException(e);
- }
+ else
+ throw new JBossResourceException("Trying to begin a nested local tx");
}
- else
- throw new JBossResourceException("Trying to begin a nested local tx");
}
+ finally
+ {
+ unlock();
+ }
}
Properties getProps()
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jdbc/xa/XAManagedConnection.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -108,81 +108,129 @@
public void start(Xid xid, int flags) throws XAException
{
+ lock();
try
{
- checkState();
- }
- catch (SQLException e)
- {
- getLog().warn("Error setting state ", e);
- }
- try
- {
- xaResource.start(xid, flags);
+ try
+ {
+ checkState();
+ }
+ catch (SQLException e)
+ {
+ getLog().warn("Error setting state ", e);
+ }
+ try
+ {
+ xaResource.start(xid, flags);
+
+ }catch(XAException e)
+ {
+ //JBAS-3336 Connections that fail in enlistment should not be returned
+ //to the pool
+ if(isFailedXA(e.errorCode))
+ {
+ getLog().error("Start transaction failed for " + this);
+ broadcastConnectionError(e);
+ }
+
+ throw e;
+ }
- }catch(XAException e)
- {
- //JBAS-3336 Connections that fail in enlistment should not be returned
- //to the pool
- if(isFailedXA(e.errorCode))
+ synchronized (stateLock)
{
- getLog().error("Start transaction failed for " + this);
- broadcastConnectionError(e);
+ currentXid = xid;
+ inManagedTransaction = true;
}
-
- throw e;
}
-
- synchronized (stateLock)
+ finally
{
- currentXid = xid;
- inManagedTransaction = true;
+ unlock();
}
}
public void end(Xid xid, int flags) throws XAException
{
+ lock();
try
{
- xaResource.end(xid, flags);
-
- }catch(XAException e)
- {
- getLog().error("End transaction failed for XAResource", e);
- broadcastConnectionError(e);
- throw e;
- }
+ try
+ {
+ xaResource.end(xid, flags);
+ }
+ catch(XAException e)
+ {
+ getLog().error("End transaction failed for XAResource", e);
+ broadcastConnectionError(e);
+ throw e;
+ }
- //we want to allow ending transactions that are not the current
- //one. When one does this, inManagedTransaction is still true.
- synchronized (stateLock)
- {
- if (currentXid != null && currentXid.equals(xid))
+ //we want to allow ending transactions that are not the current
+ //one. When one does this, inManagedTransaction is still true.
+ synchronized (stateLock)
{
- inManagedTransaction = false;
- currentXid = null;
+ if (currentXid != null && currentXid.equals(xid))
+ {
+ inManagedTransaction = false;
+ currentXid = null;
+ }
}
}
+ finally
+ {
+ unlock();
+ }
}
public int prepare(Xid xid) throws XAException
{
- return xaResource.prepare(xid);
+ lock();
+ try
+ {
+ return xaResource.prepare(xid);
+ }
+ finally
+ {
+ unlock();
+ }
}
public void commit(Xid xid, boolean onePhase) throws XAException
{
- xaResource.commit(xid, onePhase);
+ lock();
+ try
+ {
+ xaResource.commit(xid, onePhase);
+ }
+ finally
+ {
+ unlock();
+ }
}
public void rollback(Xid xid) throws XAException
{
- xaResource.rollback(xid);
+ lock();
+ try
+ {
+ xaResource.rollback(xid);
+ }
+ finally
+ {
+ unlock();
+ }
}
public void forget(Xid xid) throws XAException
{
- xaResource.forget(xid);
+ lock();
+ try
+ {
+ xaResource.forget(xid);
+ }
+ finally
+ {
+ unlock();
+ }
}
public Xid[] recover(int flag) throws XAException
@@ -222,66 +270,90 @@
public void begin() throws ResourceException
{
- synchronized (stateLock)
+ lock();
+ try
{
- if (inManagedTransaction == false)
+ synchronized (stateLock)
{
- try
+ if (inManagedTransaction == false)
{
- if (underlyingAutoCommit)
+ try
{
- underlyingAutoCommit = false;
- con.setAutoCommit(false);
+ if (underlyingAutoCommit)
+ {
+ underlyingAutoCommit = false;
+ con.setAutoCommit(false);
+ }
+ checkState();
+ inManagedTransaction = true;
}
- checkState();
- inManagedTransaction = true;
+ catch (SQLException e)
+ {
+ checkException(e);
+ }
}
- catch (SQLException e)
- {
- checkException(e);
- }
+ else
+ throw new JBossResourceException("Trying to begin a nested local tx");
}
- else
- throw new JBossResourceException("Trying to begin a nested local tx");
}
+ finally
+ {
+ unlock();
+ }
}
public void commit() throws ResourceException
{
- synchronized (stateLock)
- {
- if (inManagedTransaction)
- inManagedTransaction = false;
- }
+ lock();
try
{
- con.commit();
+ synchronized (stateLock)
+ {
+ if (inManagedTransaction)
+ inManagedTransaction = false;
+ }
+ try
+ {
+ con.commit();
+ }
+ catch (SQLException e)
+ {
+ checkException(e);
+ }
}
- catch (SQLException e)
+ finally
{
- checkException(e);
+ unlock();
}
}
public void rollback() throws ResourceException
{
- synchronized (stateLock)
- {
- if (inManagedTransaction)
- inManagedTransaction = false;
- }
+ lock();
try
{
- con.rollback();
- }
- catch (SQLException e)
- {
+ synchronized (stateLock)
+ {
+ if (inManagedTransaction)
+ inManagedTransaction = false;
+ }
try
{
- checkException(e);
+ con.rollback();
}
- catch (Exception e2)
+ catch (SQLException e)
{
+ try
+ {
+ checkException(e);
+ }
+ catch (Exception e2)
+ {
+ }
}
}
+ finally
+ {
+ unlock();
+ }
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -49,6 +49,7 @@
public void commit() throws ResourceException
{
+ mc.lock();
try
{
if (mc.getSession().getTransacted())
@@ -58,10 +59,15 @@
{
throw new JBossResourceException("Could not commit LocalTransaction", e);
}
+ finally
+ {
+ mc.unlock();
+ }
}
public void rollback() throws ResourceException
{
+ mc.lock();
try
{
if (mc.getSession().getTransacted())
@@ -71,5 +77,9 @@
{
throw new JBossResourceException("Could not rollback LocalTransaction", ex);
}
+ finally
+ {
+ mc.unlock();
+ }
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -27,12 +27,15 @@
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
@@ -141,6 +144,8 @@
private String pwd;
private boolean isDestroyed;
+ private ReentrantLock lock = new ReentrantLock();
+
// Physical JMS connection stuff
private Connection con;
private Session session;
@@ -332,6 +337,12 @@
// destory handles
destroyHandles();
+
+ // I'm recreating the lock object when we return to the pool
+ // because it looks too nasty to expect the connection handle
+ // to unlock properly in certain race conditions
+ // where the dissociation of the managed connection is "random".
+ lock = new ReentrantLock();
}
/**
@@ -360,6 +371,35 @@
("ManagedConnection in an illegal state");
}
+ protected void lock()
+ {
+ lock.lock();
+ }
+
+ protected void tryLock() throws JMSException
+ {
+ int tryLock = mcf.getUseTryLock();
+ if (tryLock <= 0)
+ {
+ lock();
+ return;
+ }
+ try
+ {
+ if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+ throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+ }
+ }
+
+ protected void unlock()
+ {
+ lock.unlock();
+ }
+
/**
* Add a connection event listener.
*
@@ -412,6 +452,7 @@
if (log.isTraceEnabled())
log.trace("XAResource=" + xaResource);
+ xaResource = new JmsXAResource(this, xaResource);
return xaResource;
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -57,6 +57,9 @@
/** For local access. */
private JMSProviderAdapter adapter;
+
+ /** The try lock */
+ private int useTryLock = 0;
public JmsManagedConnectionFactory()
{
@@ -313,7 +316,27 @@
{
return adapter;
}
+
+ /**
+ * Get the useTryLock.
+ *
+ * @return the useTryLock.
+ */
+ public int getUseTryLock()
+ {
+ return useTryLock;
+ }
+ /**
+ * Set the useTryLock.
+ *
+ * @param useTryLock the useTryLock.
+ */
+ public void setUseTryLock(int useTryLock)
+ {
+ this.useTryLock = useTryLock;
+ }
+
private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
{
if (info == null)
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -101,54 +101,86 @@
public Message receive() throws JMSException
{
- if (trace)
- log.trace("receive " + this);
- checkState();
- Message message = consumer.receive();
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receive " + this);
+ checkState();
+ Message message = consumer.receive();
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public Message receive(long timeout) throws JMSException
{
- if (trace)
- log.trace("receive " + this + " timeout=" + timeout);
- checkState();
- Message message = consumer.receive(timeout);
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receive " + this + " timeout=" + timeout);
+ checkState();
+ Message message = consumer.receive(timeout);
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public Message receiveNoWait() throws JMSException
{
- if (trace)
- log.trace("receiveNoWait " + this);
- checkState();
- Message message = consumer.receiveNoWait();
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receiveNoWait " + this);
+ checkState();
+ Message message = consumer.receiveNoWait();
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- checkState();
- session.checkStrict();
- if (listener == null)
- consumer.setMessageListener(null);
- else
- consumer.setMessageListener(wrapMessageListener(listener));
+ session.lock();
+ try
+ {
+ checkState();
+ session.checkStrict();
+ if (listener == null)
+ consumer.setMessageListener(null);
+ else
+ consumer.setMessageListener(wrapMessageListener(listener));
+ }
+ finally
+ {
+ session.unlock();
+ }
}
void closeConsumer() throws JMSException
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -79,42 +79,74 @@
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
- checkState();
- producer.send(destination, message, deliveryMode, priority, timeToLive);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void send(Destination destination, Message message) throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message);
- checkState();
- producer.send(destination, message);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ producer.send(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- if (trace)
- log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
- checkState();
- producer.send(message, deliveryMode, priority, timeToLive);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void send(Message message) throws JMSException
{
- if (trace)
- log.trace("send " + this + " message=" + message);
- checkState();
- producer.send(message);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message);
+ checkState();
+ producer.send(message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public int getDeliveryMode() throws JMSException
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,21 +59,37 @@
public void send(Queue destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
- checkState();
- producer.send(destination, message, deliveryMode, priority, timeToLive);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void send(Queue destination, Message message) throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message);
- checkState();
- producer.send(destination, message);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ producer.send(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -22,6 +22,7 @@
package org.jboss.resource.adapter.jms;
import java.io.Serializable;
+import java.sql.SQLException;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,6 +42,7 @@
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
@@ -102,6 +104,24 @@
this.sf = sf;
}
+ protected void lock() throws JMSException
+ {
+ JmsManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.tryLock();
+ else
+ throw new ResourceAllocationException("Connection is not associated with a managed connection. " + this);
+ }
+
+ protected void unlock()
+ {
+ JmsManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.unlock();
+ // We recreate the lock when returned to the pool
+ // so missing the unlock after disassociation is not important
+ }
+
/**
* Ensure that the session is opened.
*
@@ -248,32 +268,56 @@
// FIXME - is this really OK, probably not
public void commit() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted() == false)
- throw new IllegalStateException("Session is not transacted");
- if (trace)
- log.trace("Commit session " + this);
- session.commit();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted() == false)
+ throw new IllegalStateException("Session is not transacted");
+ if (trace)
+ log.trace("Commit session " + this);
+ session.commit();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void rollback() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted() == false)
- throw new IllegalStateException("Session is not transacted");
- if (trace)
- log.trace("Rollback session " + this);
- session.rollback();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted() == false)
+ throw new IllegalStateException("Session is not transacted");
+ if (trace)
+ log.trace("Rollback session " + this);
+ session.rollback();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void recover() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted())
- throw new IllegalStateException("Session is transacted");
- if (trace)
- log.trace("Recover session " + this);
- session.recover();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted())
+ throw new IllegalStateException("Session is transacted");
+ if (trace)
+ log.trace("Recover session " + this);
+ session.recover();
+ }
+ finally
+ {
+ unlock();
+ }
}
// --- TopicSession API
@@ -296,28 +340,44 @@
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createSubscriber " + session + " topic=" + topic);
- TopicSubscriber result = session.createSubscriber(topic);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createSubscriber " + session + " topic=" + topic);
+ TopicSubscriber result = session.createSubscriber(topic);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
- TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
+ TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -327,42 +387,66 @@
throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
}
- Session session = getSession();
- if (trace)
- log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
- TopicSubscriber result = session.createDurableSubscriber(topic, name);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
+ TopicSubscriber result = session.createDurableSubscriber(topic, name);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
- TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
+ TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createPublisher " + session + " topic=" + topic);
- TopicPublisher result = session.createPublisher(topic);
- result = new JmsTopicPublisher(result, this);
- if (trace)
- log.trace("createdPublisher " + session + " publisher=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createPublisher " + session + " topic=" + topic);
+ TopicPublisher result = session.createPublisher(topic);
+ result = new JmsTopicPublisher(result, this);
+ if (trace)
+ log.trace("createdPublisher " + session + " publisher=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -372,14 +456,22 @@
throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
}
- Session session = getSession();
- if (trace)
- log.trace("createTemporaryTopic " + session);
- TemporaryTopic temp = session.createTemporaryTopic();
- if (trace)
- log.trace("createdTemporaryTopic " + session + " temp=" + temp);
- sf.addTemporaryTopic(temp);
- return temp;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createTemporaryTopic " + session);
+ TemporaryTopic temp = session.createTemporaryTopic();
+ if (trace)
+ log.trace("createdTemporaryTopic " + session + " temp=" + temp);
+ sf.addTemporaryTopic(temp);
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
}
public void unsubscribe(String name) throws JMSException
@@ -389,10 +481,18 @@
throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
}
- Session session = getSession();
- if (trace)
- log.trace("unsubscribe " + session + " name=" + name);
- session.unsubscribe(name);
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("unsubscribe " + session + " name=" + name);
+ session.unsubscribe(name);
+ }
+ finally
+ {
+ unlock();
+ }
}
//--- QueueSession API
@@ -445,41 +545,65 @@
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createReceiver " + session + " queue=" + queue);
- QueueReceiver result = session.createReceiver(queue);
- result = new JmsQueueReceiver(result, this);
- if (trace)
- log.trace("createdReceiver " + session + " receiver=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createReceiver " + session + " queue=" + queue);
+ QueueReceiver result = session.createReceiver(queue);
+ result = new JmsQueueReceiver(result, this);
+ if (trace)
+ log.trace("createdReceiver " + session + " receiver=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
- QueueReceiver result = session.createReceiver(queue, messageSelector);
- result = new JmsQueueReceiver(result, this);
- if (trace)
- log.trace("createdReceiver " + session + " receiver=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
+ QueueReceiver result = session.createReceiver(queue, messageSelector);
+ result = new JmsQueueReceiver(result, this);
+ if (trace)
+ log.trace("createdReceiver " + session + " receiver=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public QueueSender createSender(Queue queue) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createSender " + session + " queue=" + queue);
- QueueSender result = session.createSender(queue);
- result = new JmsQueueSender(result, this);
- if (trace)
- log.trace("createdSender " + session + " sender=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createSender " + session + " queue=" + queue);
+ QueueSender result = session.createSender(queue);
+ result = new JmsQueueSender(result, this);
+ if (trace)
+ log.trace("createdSender " + session + " sender=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -489,69 +613,110 @@
throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
}
- Session session = getSession();
- if (trace)
- log.trace("createTemporaryQueue " + session);
- TemporaryQueue temp = session.createTemporaryQueue();
- if (trace)
- log.trace("createdTemporaryQueue " + session + " temp=" + temp);
- sf.addTemporaryQueue(temp);
- return temp;
+
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createTemporaryQueue " + session);
+ TemporaryQueue temp = session.createTemporaryQueue();
+ if (trace)
+ log.trace("createdTemporaryQueue " + session + " temp=" + temp);
+ sf.addTemporaryQueue(temp);
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
}
// -- JMS 1.1
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination);
- MessageConsumer result = session.createConsumer(destination);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination);
+ MessageConsumer result = session.createConsumer(destination);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
- MessageConsumer result = session.createConsumer(destination, messageSelector);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
+ MessageConsumer result = session.createConsumer(destination, messageSelector);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
- MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
+ MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageProducer createProducer(Destination destination) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createProducer " + session + " dest=" + destination);
- MessageProducer result = getSession().createProducer(destination);
- result = new JmsMessageProducer(result, this);
- if (trace)
- log.trace("createdProducer " + session + " producer=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createProducer " + session + " dest=" + destination);
+ MessageProducer result = getSession().createProducer(destination);
+ result = new JmsMessageProducer(result, this);
+ if (trace)
+ log.trace("createdProducer " + session + " producer=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public int getAcknowledgeMode() throws JMSException
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java 2007-12-20 16:53:00 UTC (rev 68458)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -59,6 +59,14 @@
public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
+ session.lock();
+ try
+ {
+ }
+ finally
+ {
+ session.unlock();
+ }
if (trace)
log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
checkState();
@@ -69,32 +77,56 @@
public void publish(Message message) throws JMSException
{
- if (trace)
- log.trace("send " + this + " message=" + message);
- checkState();
- ((TopicPublisher) producer).publish(message);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message);
+ checkState();
+ ((TopicPublisher) producer).publish(message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void publish(Topic destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
- checkState();
- ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void publish(Topic destination, Message message) throws JMSException
{
- if (trace)
- log.trace("send " + this + " destination=" + destination + " message=" + message);
- checkState();
- ((TopicPublisher) producer).publish(destination, message);
- if (trace)
- log.trace("sent " + this + " result=" + message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ ((TopicPublisher) producer).publish(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
}
Added: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java (rev 0)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java 2007-12-20 16:54:27 UTC (rev 68459)
@@ -0,0 +1,153 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2007, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * JmsXAResource.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsXAResource implements XAResource
+{
+ /** The managed connection */
+ private JmsManagedConnection managedConnection;
+
+ /** The resource */
+ private XAResource xaResource;
+
+ /**
+ * Create a new JmsXAResource.
+ *
+ * @param managedConnection the managed connection
+ * @param xaResource the xa resource
+ */
+ public JmsXAResource(JmsManagedConnection managedConnection, XAResource xaResource)
+ {
+ this.managedConnection = managedConnection;
+ this.xaResource = xaResource;
+ }
+
+ public void start(Xid xid, int flags) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ xaResource.start(xid, flags);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ xaResource.end(xid, flags);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public int prepare(Xid xid) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ return xaResource.prepare(xid);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ xaResource.commit(xid, onePhase);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public void rollback(Xid xid) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ xaResource.rollback(xid);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public void forget(Xid xid) throws XAException
+ {
+ managedConnection.lock();
+ try
+ {
+ xaResource.forget(xid);
+ }
+ finally
+ {
+ managedConnection.unlock();
+ }
+ }
+
+ public boolean isSameRM(XAResource xaRes) throws XAException
+ {
+ return xaResource.isSameRM(xaRes);
+ }
+
+ public Xid[] recover(int flag) throws XAException
+ {
+ return xaResource.recover(flag);
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ return xaResource.getTransactionTimeout();
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException
+ {
+ return xaResource.setTransactionTimeout(seconds);
+ }
+
+
+}
More information about the jboss-cvs-commits
mailing list