[jboss-cvs] JBossAS SVN: r71790 - trunk/connector/src/main/org/jboss/resource/adapter/jms.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 8 10:00:11 EDT 2008


Author: adrian at jboss.org
Date: 2008-04-08 10:00:11 -0400 (Tue, 08 Apr 2008)
New Revision: 71790

Added:
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
Modified:
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsResourceAdapter.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java
   trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java
Log:
[JBAS-5278] - Port JMS fixes from 4.2

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -21,6 +21,8 @@
  */
 package org.jboss.resource.adapter.jms;
 
+import java.sql.SQLException;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
@@ -31,6 +33,8 @@
 import javax.resource.spi.ManagedConnectionFactory;
 
 import org.jboss.logging.Logger;
+import org.jboss.resource.connectionmanager.JTATransactionChecker;
+import org.jboss.util.NestedSQLException;
 
 /**
  * The the connection factory implementation for the JMS RA.

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -49,6 +49,7 @@
 
 	public void commit() throws ResourceException
 	{
+	    mc.lock();
 		try
 		{
 			if (mc.getSession().getTransacted())
@@ -58,10 +59,15 @@
 		{
 			throw new JBossResourceException("Could not commit LocalTransaction", e);
 		}
+		finally
+		{
+		   mc.unlock();
+		}
 	}
 
 	public void rollback() throws ResourceException
 	{
+	    mc.lock();
 		try
 		{
 			if (mc.getSession().getTransacted())
@@ -71,5 +77,9 @@
 		{
 			throw new JBossResourceException("Could not rollback LocalTransaction", ex);
 		}
+		finally
+		{
+		   mc.unlock();
+		}
 	}
 }

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -27,12 +27,15 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
@@ -141,6 +144,8 @@
    private String pwd;
    private boolean isDestroyed;
 
+   private ReentrantLock lock = new ReentrantLock();
+   
    // Physical JMS connection stuff
    private Connection con;
    private Session session;
@@ -332,6 +337,12 @@
 
       // destory handles
       destroyHandles();
+
+      // I'm recreating the lock object when we return to the pool
+      // because it looks too nasty to expect the connection handle
+      // to unlock properly in certain race conditions
+      // where the dissociation of the managed connection is "random".
+      lock = new ReentrantLock();
    }
 
    /**
@@ -360,6 +371,35 @@
             ("ManagedConnection in an illegal state");
    }
 
+   protected void lock()
+   {
+      lock.lock();
+   }
+
+   protected void tryLock() throws JMSException
+   {
+      int tryLock = mcf.getUseTryLock();
+      if (tryLock <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+            throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+      }
+      catch (InterruptedException e)
+      {
+         throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+      }
+   }
+   
+   protected void unlock()
+   {
+      lock.unlock();
+   }
+
    /**
     * Add a connection event listener.
     *
@@ -412,6 +452,7 @@
       if (log.isTraceEnabled())
          log.trace("XAResource=" + xaResource);
 
+      xaResource = new JmsXAResource(this, xaResource);
       return xaResource;
    }
 

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -57,6 +57,9 @@
 
    /** For local access. */
    private JMSProviderAdapter adapter;
+   
+   /** The try lock */
+   private int useTryLock = 0;
 
    public JmsManagedConnectionFactory()
    {
@@ -108,12 +111,6 @@
       if (trace)
          log.trace("created new managed connection: " + mc);
 
-      // Set default logwriter according to spec
-
-      // 
-      // jason: screw the logWriter stuff for now it sucks ass
-      //
-
       return mc;
    }
 
@@ -319,7 +316,27 @@
    {
       return adapter;
    }
+   
+   /**
+    * Get the useTryLock.
+    * 
+    * @return the useTryLock.
+    */
+   public int getUseTryLock()
+   {
+      return useTryLock;
+   }
 
+   /**
+    * Set the useTryLock.
+    * 
+    * @param useTryLock the useTryLock.
+    */
+   public void setUseTryLock(int useTryLock)
+   {
+      this.useTryLock = useTryLock;
+   }
+
    private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
    {
       if (info == null)

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -80,64 +80,107 @@
          session.removeConsumer(this);
       }
    }
+
+   void checkState() throws JMSException
+   {
+      session.checkTransactionActive();
+   }
    
    public MessageListener getMessageListener() throws JMSException
    {
+      checkState();
       session.checkStrict();
       return consumer.getMessageListener();
    }
    
    public String getMessageSelector() throws JMSException
    {
+      checkState();
       return consumer.getMessageSelector();
    }
    
    public Message receive() throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this);
-      Message message = consumer.receive();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receive " + this);
+         checkState();
+         Message message = consumer.receive();
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public Message receive(long timeout) throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this + " timeout=" + timeout);
-      Message message = consumer.receive(timeout);
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receive " + this + " timeout=" + timeout);
+         checkState();
+         Message message = consumer.receive(timeout);
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    public Message receiveNoWait() throws JMSException
    {
-      if (trace)
-         log.trace("receiveNoWait " + this);
-      Message message = consumer.receiveNoWait();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("receiveNoWait " + this);
+         checkState();
+         Message message = consumer.receiveNoWait();
+         if (trace)
+            log.trace("received " + this + " result=" + message);
+         if (message == null)
+            return null;
+         else
+            return wrapMessage(message);
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
    
    public void setMessageListener(MessageListener listener) throws JMSException
    {
-      session.checkStrict();
-      if (listener == null)
-         consumer.setMessageListener(null);
-      else
-         consumer.setMessageListener(wrapMessageListener(listener));
+      session.lock();
+      try
+      {
+         checkState();
+         session.checkStrict();
+         if (listener == null)
+            consumer.setMessageListener(null);
+         else
+            consumer.setMessageListener(wrapMessageListener(listener));
+      }
+      finally
+      {
+         session.unlock();
+      }
    }
 
    void closeConsumer() throws JMSException

Added: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	                        (rev 0)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -0,0 +1,216 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsMessageProducer.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsMessageProducer implements MessageProducer
+{
+   private static final Logger log = Logger.getLogger(JmsMessageConsumer.class);
+
+   /** The wrapped message producer */
+   MessageProducer producer;
+   
+   /** The session for this consumer */
+   JmsSession session;
+   
+   /** Whether trace is enabled */
+   private boolean trace = log.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * 
+    * @param producer the producer
+    * @param session the session
+    */
+   public JmsMessageProducer(MessageProducer producer, JmsSession session)
+   {
+      this.producer = producer;
+      this.session = session;
+      
+      if (trace)
+         log.trace("new JmsMessageProducer " + this + " producer=" + producer + " session=" + session);
+   }
+
+   public void close() throws JMSException
+   {
+      if (trace)
+         log.trace("close " + this);
+      try
+      {
+         closeProducer();
+      }
+      finally
+      {
+         session.removeProducer(this);
+      }
+   }
+
+   public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+         throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Destination destination, Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message);
+         checkState();
+         producer.send(message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public int getDeliveryMode() throws JMSException
+   {
+      return producer.getDeliveryMode();
+   }
+
+   public Destination getDestination() throws JMSException
+   {
+      return producer.getDestination();
+   }
+
+   public boolean getDisableMessageID() throws JMSException
+   {
+      return producer.getDisableMessageID();
+   }
+
+   public boolean getDisableMessageTimestamp() throws JMSException
+   {
+      return producer.getDisableMessageTimestamp();
+   }
+
+   public int getPriority() throws JMSException
+   {
+      return producer.getPriority();
+   }
+
+   public long getTimeToLive() throws JMSException
+   {
+      return producer.getTimeToLive();
+   }
+
+   public void setDeliveryMode(int deliveryMode) throws JMSException
+   {
+      producer.setDeliveryMode(deliveryMode);
+   }
+
+   public void setDisableMessageID(boolean value) throws JMSException
+   {
+      producer.setDisableMessageID(value);
+   }
+
+   public void setDisableMessageTimestamp(boolean value) throws JMSException
+   {
+      producer.setDisableMessageTimestamp(value);
+   }
+
+   public void setPriority(int defaultPriority) throws JMSException
+   {
+      producer.setPriority(defaultPriority);
+   }
+
+   public void setTimeToLive(long timeToLive) throws JMSException
+   {
+      producer.setTimeToLive(timeToLive);
+   }
+
+   void checkState() throws JMSException
+   {
+      session.checkTransactionActive();
+   }
+
+   void closeProducer() throws JMSException
+   {
+      producer.close();
+   }
+}

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -46,6 +46,7 @@
 
    public Queue getQueue() throws JMSException
    {
+      checkState();
       return ((QueueReceiver) consumer).getQueue();
    }
 }

Added: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	                        (rev 0)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsQueueSender.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsQueueSender extends JmsMessageProducer implements QueueSender
+{
+   private static final Logger log = Logger.getLogger(JmsQueueSender.class);
+   
+   /** Whether trace is enabled */
+   private boolean trace = log.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * 
+    * @param producer the producer
+    * @param session the session
+    */
+   public JmsQueueSender(QueueSender producer, JmsSession session)
+   {
+      super(producer, session);
+   }
+
+   public Queue getQueue() throws JMSException
+   {
+      return ((QueueSender) producer).getQueue();
+   }
+
+   public void send(Queue destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Queue destination, Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+}

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsResourceAdapter.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsResourceAdapter.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsResourceAdapter.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -29,7 +29,6 @@
 import javax.resource.spi.BootstrapContext;
 import javax.resource.spi.ResourceAdapter;
 import javax.resource.spi.ResourceAdapterInternalException;
-import javax.resource.spi.XATerminator;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.XAResource;
@@ -66,11 +65,7 @@
    {
       return ctx.getWorkManager();
    }
-   
-   public XATerminator getTerminator()
-   {
-      return ctx.getXATerminator();
-   }
+
    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException
    {
       JmsActivation activation = new JmsActivation(this, endpointFactory, (JmsActivationSpec) spec);

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -22,6 +22,7 @@
 package org.jboss.resource.adapter.jms;
 
 import java.io.Serializable;
+import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.Iterator;
 
@@ -41,6 +42,7 @@
 import javax.jms.QueueReceiver;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
@@ -73,7 +75,7 @@
    private JmsConnectionRequestInfo info;
 
    /** The session factory for this session */
-   private JmsSessionFactory sf;
+   private JmsSessionFactoryImpl sf;
    
    /** The message consumers */
    private HashSet consumers = new HashSet();
@@ -97,11 +99,29 @@
          log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info);
    }
 
-   public void setJmsSessionFactory(JmsSessionFactory sf)
+   public void setJmsSessionFactory(JmsSessionFactoryImpl sf)
    {
       this.sf = sf;
    }
    
+   protected void lock() throws JMSException
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.tryLock();
+      else
+         throw new ResourceAllocationException("Connection is not associated with a managed connection. " + this);
+   }
+
+   protected void unlock()
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.unlock();
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+   
    /**
     * Ensure that the session is opened.
     * 
@@ -115,12 +135,20 @@
       if (mc == null)
          throw new IllegalStateException("The session is closed");
       
+      checkTransactionActive();
+      
       Session session = mc.getSession();
       if (trace)
          log.trace("getSession " + session + " for " + this);
       return session;
    }
 
+   void checkTransactionActive() throws IllegalStateException
+   {
+      if (sf != null)
+         sf.checkTransactionActive();
+   }
+   
    // ---- Session API
 
    public BytesMessage createBytesMessage() throws JMSException
@@ -240,32 +268,56 @@
    // FIXME - is this really OK, probably not
    public void commit() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Commit session " + this);
-      session.commit();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted() == false)
+            throw new IllegalStateException("Session is not transacted");
+         if (trace)
+            log.trace("Commit session " + this);
+         session.commit();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void rollback() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Rollback session " + this);
-      session.rollback();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted() == false)
+            throw new IllegalStateException("Session is not transacted");
+         if (trace)
+            log.trace("Rollback session " + this);
+         session.rollback();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void recover() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted())
-         throw new IllegalStateException("Session is transacted");
-      if (trace)
-         log.trace("Recover session " + this);
-      session.recover();
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (info.isTransacted())
+            throw new IllegalStateException("Session is transacted");
+         if (trace)
+            log.trace("Recover session " + this);
+         session.recover();
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    // --- TopicSession API
@@ -288,28 +340,44 @@
 
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic);
-      TopicSubscriber result = session.createSubscriber(topic);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createSubscriber " + session + " topic=" + topic);
+         TopicSubscriber result = session.createSubscriber(topic);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
+         TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -319,41 +387,66 @@
          throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");         
       }
       
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
+         TopicSubscriber result = session.createDurableSubscriber(topic, name);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
+         TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+         result = new JmsTopicSubscriber(result, this);
+         if (trace)
+            log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TopicPublisher createPublisher(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createPublisher " + session + " topic=" + topic);
-      TopicPublisher result = session.createPublisher(topic);
-      if (trace)
-         log.trace("createdPublisher " + session + " publisher=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         TopicSession session = getTopicSession();
+         if (trace)
+            log.trace("createPublisher " + session + " topic=" + topic);
+         TopicPublisher result = session.createPublisher(topic);
+         result = new JmsTopicPublisher(result, this);
+         if (trace)
+            log.trace("createdPublisher " + session + " publisher=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -363,14 +456,22 @@
          throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");         
       }
       
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryTopic " + session);
-      TemporaryTopic temp = session.createTemporaryTopic();
-      if (trace)
-         log.trace("createdTemporaryTopic " + session + " temp=" + temp);
-      sf.addTemporaryTopic(temp);
-      return temp;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createTemporaryTopic " + session);
+         TemporaryTopic temp = session.createTemporaryTopic();
+         if (trace)
+            log.trace("createdTemporaryTopic " + session + " temp=" + temp);
+         sf.addTemporaryTopic(temp);
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void unsubscribe(String name) throws JMSException
@@ -380,10 +481,18 @@
          throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");         
       }
 
-      Session session = getSession();
-      if (trace)
-         log.trace("unsubscribe " + session + " name=" + name);
-      session.unsubscribe(name);
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("unsubscribe " + session + " name=" + name);
+         session.unsubscribe(name);
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    //--- QueueSession API
@@ -436,40 +545,65 @@
 
    public QueueReceiver createReceiver(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue);
-      QueueReceiver result = session.createReceiver(queue);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createReceiver " + session + " queue=" + queue);
+         QueueReceiver result = session.createReceiver(queue);
+         result = new JmsQueueReceiver(result, this);
+         if (trace)
+            log.trace("createdReceiver " + session + " receiver=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
-      QueueReceiver result = session.createReceiver(queue, messageSelector);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
+         QueueReceiver result = session.createReceiver(queue, messageSelector);
+         result = new JmsQueueReceiver(result, this);
+         if (trace)
+            log.trace("createdReceiver " + session + " receiver=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public QueueSender createSender(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createSender " + session + " queue=" + queue);
-      QueueSender result = session.createSender(queue);
-      if (trace)
-         log.trace("createdSender " + session + " sender=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         QueueSession session = getQueueSession();
+         if (trace)
+            log.trace("createSender " + session + " queue=" + queue);
+         QueueSender result = session.createSender(queue);
+         result = new JmsQueueSender(result, this);
+         if (trace)
+            log.trace("createdSender " + session + " sender=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -479,68 +613,110 @@
          throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
          
       }
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryQueue " + session);
-      TemporaryQueue temp = session.createTemporaryQueue();
-      if (trace)
-         log.trace("createdTemporaryQueue " + session + " temp=" + temp);
-      sf.addTemporaryQueue(temp);
-      return temp;
+
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createTemporaryQueue " + session);
+         TemporaryQueue temp = session.createTemporaryQueue();
+         if (trace)
+            log.trace("createdTemporaryQueue " + session + " temp=" + temp);
+         sf.addTemporaryQueue(temp);
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    // -- JMS 1.1
 
    public MessageConsumer createConsumer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination);
-      MessageConsumer result = session.createConsumer(destination);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination);
+         MessageConsumer result = session.createConsumer(destination);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
-      MessageConsumer result = session.createConsumer(destination, messageSelector);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
+         MessageConsumer result = session.createConsumer(destination, messageSelector);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
-      MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
+         MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+         result = new JmsMessageConsumer(result, this);
+         if (trace)
+            log.trace("createdConsumer " + session + " consumer=" + result);
+         addConsumer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public MessageProducer createProducer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createProducer " + session + " dest=" + destination);
-      MessageProducer result = getSession().createProducer(destination);
-      if (trace)
-         log.trace("createdProducer " + session + " producer=" + result);
-      addProducer(result);
-      return result;
+      lock();
+      try
+      {
+         Session session = getSession();
+         if (trace)
+            log.trace("createProducer " + session + " dest=" + destination);
+         MessageProducer result = getSession().createProducer(destination);
+         result = new JmsMessageProducer(result, this);
+         if (trace)
+            log.trace("createdProducer " + session + " producer=" + result);
+         addProducer(result);
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
    public int getAcknowledgeMode() throws JMSException
@@ -617,10 +793,10 @@
          {
             for (Iterator i = producers.iterator(); i.hasNext();)
             {
-               MessageProducer producer = (MessageProducer) i.next();
+               JmsMessageProducer producer = (JmsMessageProducer) i.next();
                try
                {
-                  producer.close();
+                  producer.closeProducer();
                }
                catch (Throwable t)
                {

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -45,6 +45,7 @@
 import javax.resource.spi.ManagedConnectionFactory;
 
 import org.jboss.logging.Logger;
+import org.jboss.resource.connectionmanager.JTATransactionChecker;
 
 /**
  * Implements the JMS Connection API and produces {@link JmsSession} objects.
@@ -387,16 +388,33 @@
             if (trace)
                log.trace("Allocating session for " + this + " with request info=" + info);
             JmsSession session = (JmsSession) cm.allocateConnection(mcf, info);
-            if (trace)
-               log.trace("Allocated  " + this + " session=" + session);
-            session.setJmsSessionFactory(this);
-            if (started)
-               session.start();
-            sessions.add(session);
-            return session;
+            try
+            {
+               if (trace)
+                  log.trace("Allocated  " + this + " session=" + session);
+               session.setJmsSessionFactory(this);
+               if (started)
+                  session.start();
+               sessions.add(session);
+               return session;
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+               if (t instanceof Exception)
+                  throw (Exception) t;
+               else
+                  throw new RuntimeException("Unexpected error: ", t);
+            }
          }
       }
-      catch (ResourceException e)
+      catch (Exception e)
       {
          log.error("could not create session", e);
          
@@ -411,5 +429,29 @@
    {
       if (closed)
          throw new IllegalStateException("The connection is closed");
+      checkTransactionActive();
    }
+   
+   /**
+    * Check whether a tranasction is active
+    *
+    * @throws IllegalStateException if the transaction is not active, preparing, prepared or committing or for any error in the transaction manager
+    */
+   protected void checkTransactionActive() throws IllegalStateException
+   {
+      if (cm == null)
+         throw new IllegalStateException("No connection manager");
+      try
+      {
+         if (cm instanceof JTATransactionChecker)
+            ((JTATransactionChecker) cm).checkTransactionActive();
+      }
+      catch (Exception e)
+      {
+         IllegalStateException ex = new IllegalStateException("Transaction not active");
+         ex.initCause(e);
+         ex.setLinkedException(e);
+         throw ex;
+      }
+   }
 }

Added: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java	                        (rev 0)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsQueueSender.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsTopicPublisher extends JmsMessageProducer implements TopicPublisher
+{
+   private static final Logger log = Logger.getLogger(JmsTopicPublisher.class);
+   
+   /** Whether trace is enabled */
+   private boolean trace = log.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * 
+    * @param producer the producer
+    * @param session the session
+    */
+   public JmsTopicPublisher(TopicPublisher producer, JmsSession session)
+   {
+      super(producer, session);
+   }
+
+   public Topic getTopic() throws JMSException
+   {
+      return ((TopicPublisher) producer).getTopic();
+   }
+
+   public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+      }
+      finally
+      {
+         session.unlock();
+      }
+      if (trace)
+         log.trace("send " + this  + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+      checkState();
+      ((TopicPublisher) producer).publish(message, deliveryMode, priority, timeToLive);
+      if (trace)
+         log.trace("sent " + this + " result=" + message);
+   }
+
+   public void publish(Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message);
+         checkState();
+         ((TopicPublisher) producer).publish(message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void publish(Topic destination, Message message, int deliveryMode, int priority, long timeToLive)
+         throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void publish(Topic destination, Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         ((TopicPublisher) producer).publish(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+}

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java	2008-04-08 13:46:40 UTC (rev 71789)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -46,11 +46,13 @@
 
    public boolean getNoLocal() throws JMSException
    {
+      checkState();
       return ((TopicSubscriber) consumer).getNoLocal();
    }
 
    public Topic getTopic() throws JMSException
    {
+      checkState();
       return ((TopicSubscriber) consumer).getTopic();
    }
 }

Added: trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	                        (rev 0)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/JmsXAResource.java	2008-04-08 14:00:11 UTC (rev 71790)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.resource.adapter.jms;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * JmsXAResource.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsXAResource implements XAResource
+{
+   /** The managed connection */
+   private JmsManagedConnection managedConnection;
+   
+   /** The resource */
+   private XAResource xaResource;
+
+   /**
+    * Create a new JmsXAResource.
+    * 
+    * @param managedConnection the managed connection
+    * @param xaResource the xa resource
+    */
+   public JmsXAResource(JmsManagedConnection managedConnection, XAResource xaResource)
+   {
+      this.managedConnection = managedConnection;
+      this.xaResource = xaResource;
+   }
+
+   public void start(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void end(Xid xid, int flags) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public int prepare(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void commit(Xid xid, boolean onePhase) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void rollback(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public void forget(Xid xid) throws XAException
+   {
+      managedConnection.lock();
+      try
+      {
+         xaResource.forget(xid);
+      }
+      finally
+      {
+         managedConnection.unlock();
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      return xaResource.isSameRM(xaRes);
+   }
+
+   public Xid[] recover(int flag) throws XAException
+   {
+      return xaResource.recover(flag);
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      return xaResource.getTransactionTimeout();
+   }
+
+   public boolean setTransactionTimeout(int seconds) throws XAException
+   {
+      return xaResource.setTransactionTimeout(seconds);
+   }
+   
+   
+}




More information about the jboss-cvs-commits mailing list