[jboss-cvs] JBossAS SVN: r71464 - in branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter: jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 31 05:20:48 EDT 2008


Author: vicky.kak at jboss.com
Date: 2008-03-31 05:20:47 -0400 (Mon, 31 Mar 2008)
New Revision: 71464

Added:
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
Modified:
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
Log:
[JBPAPP-662]Add a lock for the session to avoid racing between jms activity and asynchronous rollback

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java	2008-03-31 09:18:59 UTC (rev 71463)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/WrappedPreparedStatement.java	2008-03-31 09:20:47 UTC (rev 71464)
@@ -59,15 +59,23 @@
 
    public Statement getUnderlyingStatement() throws SQLException
    {
-      checkState();
-      if (ps instanceof CachedPreparedStatement)
-      {
-         return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
-      }
-      else
-      {
-         return ps;
-      }
+	  lock();
+	  try
+	  {
+	      checkTransaction();
+	      if (ps instanceof CachedPreparedStatement)
+	      {
+	         return ((CachedPreparedStatement)ps).getUnderlyingPreparedStatement();
+	      }
+	      else
+	      {
+	         return ps;
+	      }
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public void setBoolean(int parameterIndex, boolean value) throws SQLException
@@ -202,16 +210,24 @@
 
    public boolean execute() throws SQLException
    {
-      checkTransaction();
-      try 
-      {
-         checkConfiguredQueryTimeout();
-         return ps.execute();         
-      }
-      catch (Throwable t)
-      {
-         throw checkException(t);
-      }
+	  lock();
+	  try
+	  {
+	      checkTransaction();
+	      try 
+	      {
+	         checkConfiguredQueryTimeout();
+	         return ps.execute();         
+	      }
+	      catch (Throwable t)
+	      {
+	         throw checkException(t);
+	      }
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public ResultSetMetaData getMetaData() throws SQLException
@@ -229,44 +245,68 @@
 
    public ResultSet executeQuery() throws SQLException
    {
-      checkTransaction();
-      try 
-      {
-         checkConfiguredQueryTimeout();
-         ResultSet resultSet = ps.executeQuery();
-         return registerResultSet(resultSet);
-      }
-      catch (Throwable t)
-      {
-         throw checkException(t);
-      }
+	  lock();
+	  try
+	  {
+	      checkTransaction();
+	      try 
+	      {
+	         checkConfiguredQueryTimeout();
+	         ResultSet resultSet = ps.executeQuery();
+	         return registerResultSet(resultSet);
+	      }
+	      catch (Throwable t)
+	      {
+	         throw checkException(t);
+	      }
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public int executeUpdate() throws SQLException
    {
-      checkTransaction();
-      try 
-      {
-         checkConfiguredQueryTimeout();
-         return ps.executeUpdate();         
-      }
-      catch (Throwable t)
-      {
-         throw checkException(t);
-      }
+	  lock();
+	  try
+	  {
+	      checkTransaction();
+	      try 
+	      {
+	         checkConfiguredQueryTimeout();
+	         return ps.executeUpdate();         
+	      }
+	      catch (Throwable t)
+	      {
+	         throw checkException(t);
+	      }
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public void addBatch() throws SQLException
    {
-      checkState();
-      try 
-      {
-         ps.addBatch();         
-      }
-      catch (Throwable t)
-      {
-         throw checkException(t);
-      }
+	  lock();
+	  try
+	  {
+	      checkTransaction();
+	      try 
+	      {
+	         ps.addBatch();         
+	      }
+	      catch (Throwable t)
+	      {
+	         throw checkException(t);
+	      }
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public void setNull(int parameterIndex, int sqlType) throws SQLException

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2008-03-31 09:18:59 UTC (rev 71463)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2008-03-31 09:20:47 UTC (rev 71464)
@@ -49,27 +49,43 @@
 
 	public void commit() throws ResourceException
 	{
+		mc.lock();
 		try
 		{
-			if (mc.getSession().getTransacted())
-				mc.getSession().commit();
+			try
+			{
+				if (mc.getSession().getTransacted())
+					mc.getSession().commit();
+			}
+			catch (JMSException e)
+			{
+				throw new JBossResourceException("Could not commit LocalTransaction", e);
+			}
 		}
-		catch (JMSException e)
+		finally
 		{
-			throw new JBossResourceException("Could not commit LocalTransaction", e);
+			mc.unlock();
 		}
 	}
 
 	public void rollback() throws ResourceException
 	{
+		mc.lock();
 		try
 		{
-			if (mc.getSession().getTransacted())
-				mc.getSession().rollback();
+			try
+			{
+				if (mc.getSession().getTransacted())
+					mc.getSession().rollback();
+			}
+			catch (JMSException ex)
+			{
+				throw new JBossResourceException("Could not rollback LocalTransaction", ex);
+			}
 		}
-		catch (JMSException ex)
+		finally
 		{
-			throw new JBossResourceException("Could not rollback LocalTransaction", ex);
+			mc.unlock();
 		}
 	}
 }

Added: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	2008-03-31 09:20:47 UTC (rev 71464)
@@ -0,0 +1,95 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsQueueSender.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsQueueSender extends JmsMessageProducer implements QueueSender
+{
+   private static final Logger log = Logger.getLogger(JmsQueueSender.class);
+   
+   /** Whether trace is enabled */
+   private boolean trace = log.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * 
+    * @param producer the producer
+    * @param session the session
+    */
+   public JmsQueueSender(QueueSender producer, JmsSession session)
+   {
+      super(producer, session);
+   }
+
+   public Queue getQueue() throws JMSException
+   {
+      return ((QueueSender) producer).getQueue();
+   }
+
+   public void send(Queue destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Queue destination, Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+}

Added: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	2008-03-31 09:20:47 UTC (rev 71464)
@@ -0,0 +1,153 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2007, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * JmsXAResource.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsXAResource implements XAResource
+{
+   /** The managed connection */
+   private JmsManagedConnection managedConnection;
+   
+   /** The resource */
+   private XAResource xaResource;
+
+   /**
+    * Create a new JmsXAResource.
+    * 
+    * @param managedConnection the managed connection
+    * @param xaResource the xa resource
+    */
+   public JmsXAResource(JmsManagedConnection managedConnection, XAResource xaResource)
+   {
+      this.managedConnection = managedConnection;
+      this.xaResource = xaResource;
+   }
+
+   public void start(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void end(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public int prepare(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void commit(Xid xid, boolean onePhase) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void rollback(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void forget(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.forget(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      return xaResource.isSameRM(xaRes);
+   }
+
+   public Xid[] recover(int flag) throws XAException
+   {
+      return xaResource.recover(flag);
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      return xaResource.getTransactionTimeout();
+   }
+
+   public boolean setTransactionTimeout(int seconds) throws XAException
+   {
+      return xaResource.setTransactionTimeout(seconds);
+   }
+   
+   
+}




More information about the jboss-cvs-commits mailing list