[jboss-cvs] JBossAS SVN: r71467 - in branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter: jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 31 05:23:50 EDT 2008


Author: vicky.kak at jboss.com
Date: 2008-03-31 05:23:50 -0400 (Mon, 31 Mar 2008)
New Revision: 71467

Added:
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
Modified:
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
   branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
Log:
[JBPAPP-662]Add a lock for the session to avoid racing between jms activity and asynchronous rollback

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java	2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -34,6 +34,8 @@
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionEvent;
@@ -66,6 +68,8 @@
    private final int transactionIsolation;
    private final boolean readOnly;
 
+   private ReentrantLock lock = new ReentrantLock();
+   
    private final Collection cels = new ArrayList();
    private final Set handles = new HashSet();
    private PreparedStatementCache psCache = null;
@@ -194,6 +198,11 @@
                mcf.log.warn("Error resetting transaction isolation ", e);
             }
          }
+         // I'm recreating the lock object when we return to the pool
+         // because it looks too nasty to expect the connection handle
+         // to unlock properly in certain race conditions
+         // where the dissociation of the managed connection is "random".
+         lock = new ReentrantLock();
       }
    }
 
@@ -241,6 +250,35 @@
       }
    }
 
+   protected void lock()
+   {
+      lock.lock();
+   }
+
+   protected void tryLock() throws SQLException
+   {
+      int tryLock = mcf.getUseTryLock();
+      if (tryLock <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+            throw new SQLException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+      }
+      catch (InterruptedException e)
+      {
+         throw new SQLException("Interrupted attempting lock: " + this);
+      }
+   }
+   
+   protected void unlock()
+   {
+      lock.unlock();
+   }
+   
    void closeHandle(WrappedConnection handle)
    {
       synchronized (stateLock)

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -27,12 +27,15 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
@@ -141,6 +144,8 @@
    private String pwd;
    private boolean isDestroyed;
 
+   private ReentrantLock lock = new ReentrantLock();
+   
    // Physical JMS connection stuff
    private Connection con;
    private Session session;
@@ -332,6 +337,13 @@
 
       // destory handles
       destroyHandles();
+      
+      // I'm recreating the lock object when we return to the pool
+      // because it looks too nasty to expect the connection handle
+      // to unlock properly in certain race conditions
+      // where the dissociation of the managed connection is "random".
+      lock = new ReentrantLock();
+      
    }
 
    /**
@@ -360,6 +372,35 @@
             ("ManagedConnection in an illegal state");
    }
 
+   protected void lock()
+   {
+      lock.lock();
+   }
+
+   protected void tryLock() throws JMSException
+   {
+      int tryLock = mcf.getUseTryLock();
+      if (tryLock <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+            throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+      }
+      catch (InterruptedException e)
+      {
+         throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+      }
+   }
+   
+   protected void unlock()
+   {
+      lock.unlock();
+   }
+
    /**
     * Add a connection event listener.
     *
@@ -412,6 +453,7 @@
       if (log.isTraceEnabled())
          log.trace("XAResource=" + xaResource);
 
+      xaResource = new JmsXAResource(this, xaResource);
       return xaResource;
    }
 

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -57,6 +57,9 @@
 
    /** For local access. */
    private JMSProviderAdapter adapter;
+   
+   /** The try lock */
+   private int useTryLock = 0;
 
    public JmsManagedConnectionFactory()
    {
@@ -314,6 +317,26 @@
       return adapter;
    }
 
+   /**
+    * Get the useTryLock.
+    * 
+    * @return the useTryLock.
+    */
+   public int getUseTryLock()
+   {
+      return useTryLock;
+   }
+
+   /**
+    * Set the useTryLock.
+    * 
+    * @param useTryLock the useTryLock.
+    */
+   public void setUseTryLock(int useTryLock)
+   {
+      this.useTryLock = useTryLock;
+   }
+   
    private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
    {
       if (info == null)

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -81,63 +81,106 @@
       }
    }
    
+   void checkState() throws JMSException
+   {
+      session.checkTransactionActive();
+   }
+   
    public MessageListener getMessageListener() throws JMSException
    {
+	  checkState(); 
       session.checkStrict();
       return consumer.getMessageListener();
    }
    
    public String getMessageSelector() throws JMSException
    {
+	  checkState(); 
       return consumer.getMessageSelector();
    }
    
    public Message receive() throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this);
-      Message message = consumer.receive();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+	  session.lock();
+	  try
+	  {
+	      if (trace)
+	         log.trace("receive " + this);
+	      checkState();
+	      Message message = consumer.receive();
+	      if (trace)
+	         log.trace("received " + this + " result=" + message);
+	      if (message == null)
+	         return null;
+	      else
+	         return wrapMessage(message);
+	  }
+	  finally
+	  {
+		  session.unlock();
+	  }
    }
 
    public Message receive(long timeout) throws JMSException
    {
-      if (trace)
-         log.trace("receive " + this + " timeout=" + timeout);
-      Message message = consumer.receive(timeout);
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+	  session.lock();
+	  try
+	  {
+	      if (trace)
+	         log.trace("receive " + this + " timeout=" + timeout);
+	      checkState();
+	      Message message = consumer.receive(timeout);
+	      if (trace)
+	         log.trace("received " + this + " result=" + message);
+	      if (message == null)
+	         return null;
+	      else
+	         return wrapMessage(message);
+	  }
+	  finally
+	  {
+		  session.unlock();
+	  }
    }
 
    public Message receiveNoWait() throws JMSException
    {
-      if (trace)
-         log.trace("receiveNoWait " + this);
-      Message message = consumer.receiveNoWait();
-      if (trace)
-         log.trace("received " + this + " result=" + message);
-      if (message == null)
-         return null;
-      else
-         return wrapMessage(message);
+	  session.lock();
+	  try
+	  {
+	      if (trace)
+	         log.trace("receiveNoWait " + this);
+	      checkState();
+	      Message message = consumer.receiveNoWait();
+	      if (trace)
+	         log.trace("received " + this + " result=" + message);
+	      if (message == null)
+	         return null;
+	      else
+	         return wrapMessage(message);
+	  }
+	  finally
+	  {
+		  session.unlock();		  
+	  }
    }
    
    public void setMessageListener(MessageListener listener) throws JMSException
    {
-      session.checkStrict();
-      if (listener == null)
-         consumer.setMessageListener(null);
-      else
-         consumer.setMessageListener(wrapMessageListener(listener));
+	  session.lock();
+	  try
+	  {
+		  checkState();
+	      session.checkStrict();
+	      if (listener == null)
+	         consumer.setMessageListener(null);
+	      else
+	         consumer.setMessageListener(wrapMessageListener(listener));
+	  }
+	  finally
+	  {
+		  session.unlock();
+	  }
    }
 
    void closeConsumer() throws JMSException

Added: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -0,0 +1,216 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsMessageProducer.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsMessageProducer implements MessageProducer
+{
+   private static final Logger log = Logger.getLogger(JmsMessageConsumer.class);
+
+   /** The wrapped message producer */
+   MessageProducer producer;
+   
+   /** The session for this consumer */
+   JmsSession session;
+   
+   /** Whether trace is enabled */
+   private boolean trace = log.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * 
+    * @param producer the producer
+    * @param session the session
+    */
+   public JmsMessageProducer(MessageProducer producer, JmsSession session)
+   {
+      this.producer = producer;
+      this.session = session;
+      
+      if (trace)
+         log.trace("new JmsMessageProducer " + this + " producer=" + producer + " session=" + session);
+   }
+
+   public void close() throws JMSException
+   {
+      if (trace)
+         log.trace("close " + this);
+      try
+      {
+         closeProducer();
+      }
+      finally
+      {
+         session.removeProducer(this);
+      }
+   }
+
+   public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+         throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Destination destination, Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " destination=" + destination + " message=" + message);
+         checkState();
+         producer.send(destination, message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+         checkState();
+         producer.send(message, deliveryMode, priority, timeToLive);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public void send(Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (trace)
+            log.trace("send " + this + " message=" + message);
+         checkState();
+         producer.send(message);
+         if (trace)
+            log.trace("sent " + this + " result=" + message);
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   public int getDeliveryMode() throws JMSException
+   {
+      return producer.getDeliveryMode();
+   }
+
+   public Destination getDestination() throws JMSException
+   {
+      return producer.getDestination();
+   }
+
+   public boolean getDisableMessageID() throws JMSException
+   {
+      return producer.getDisableMessageID();
+   }
+
+   public boolean getDisableMessageTimestamp() throws JMSException
+   {
+      return producer.getDisableMessageTimestamp();
+   }
+
+   public int getPriority() throws JMSException
+   {
+      return producer.getPriority();
+   }
+
+   public long getTimeToLive() throws JMSException
+   {
+      return producer.getTimeToLive();
+   }
+
+   public void setDeliveryMode(int deliveryMode) throws JMSException
+   {
+      producer.setDeliveryMode(deliveryMode);
+   }
+
+   public void setDisableMessageID(boolean value) throws JMSException
+   {
+      producer.setDisableMessageID(value);
+   }
+
+   public void setDisableMessageTimestamp(boolean value) throws JMSException
+   {
+      producer.setDisableMessageTimestamp(value);
+   }
+
+   public void setPriority(int defaultPriority) throws JMSException
+   {
+      producer.setPriority(defaultPriority);
+   }
+
+   public void setTimeToLive(long timeToLive) throws JMSException
+   {
+      producer.setTimeToLive(timeToLive);
+   }
+
+   void checkState() throws JMSException
+   {
+      session.checkTransactionActive();
+   }
+
+   void closeProducer() throws JMSException
+   {
+      producer.close();
+   }
+}

Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java	2008-03-31 09:23:50 UTC (rev 71467)
@@ -41,6 +41,7 @@
 import javax.jms.QueueReceiver;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
@@ -73,7 +74,7 @@
    private JmsConnectionRequestInfo info;
 
    /** The session factory for this session */
-   private JmsSessionFactory sf;
+   private JmsSessionFactoryImpl sf;
    
    /** The message consumers */
    private HashSet consumers = new HashSet();
@@ -97,11 +98,29 @@
          log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info);
    }
 
-   public void setJmsSessionFactory(JmsSessionFactory sf)
+   public void setJmsSessionFactory(JmsSessionFactoryImpl sf)
    {
       this.sf = sf;
    }
    
+   protected void lock() throws JMSException
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.tryLock();
+      else
+         throw new ResourceAllocationException("Connection is not associated with a managed connection. " + this);
+   }
+
+   protected void unlock()
+   {
+      JmsManagedConnection mc = this.mc;
+      if (mc != null)
+         mc.unlock();
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+   
    /**
     * Ensure that the session is opened.
     * 
@@ -115,12 +134,20 @@
       if (mc == null)
          throw new IllegalStateException("The session is closed");
       
+      checkTransactionActive();
+      
       Session session = mc.getSession();
       if (trace)
          log.trace("getSession " + session + " for " + this);
       return session;
    }
 
+   void checkTransactionActive() throws IllegalStateException
+   {
+      if (sf != null)
+         sf.checkTransactionActive();
+   }
+   
    // ---- Session API
 
    public BytesMessage createBytesMessage() throws JMSException
@@ -240,32 +267,56 @@
    // FIXME - is this really OK, probably not
    public void commit() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Commit session " + this);
-      session.commit();
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (info.isTransacted() == false)
+	         throw new IllegalStateException("Session is not transacted");
+	      if (trace)
+	         log.trace("Commit session " + this);
+	      session.commit();
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public void rollback() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted() == false)
-         throw new IllegalStateException("Session is not transacted");
-      if (trace)
-         log.trace("Rollback session " + this);
-      session.rollback();
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (info.isTransacted() == false)
+	         throw new IllegalStateException("Session is not transacted");
+	      if (trace)
+	         log.trace("Rollback session " + this);
+	      session.rollback();
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public void recover() throws JMSException
    {
-      Session session = getSession();
-      if (info.isTransacted())
-         throw new IllegalStateException("Session is transacted");
-      if (trace)
-         log.trace("Recover session " + this);
-      session.recover();
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (info.isTransacted())
+	         throw new IllegalStateException("Session is transacted");
+	      if (trace)
+	         log.trace("Recover session " + this);
+	      session.recover();
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    // --- TopicSession API
@@ -288,72 +339,113 @@
 
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic);
-      TopicSubscriber result = session.createSubscriber(topic);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      TopicSession session = getTopicSession();
+	      if (trace)
+	         log.trace("createSubscriber " + session + " topic=" + topic);
+	      TopicSubscriber result = session.createSubscriber(topic);
+	      result = new JmsTopicSubscriber(result, this);
+	      if (trace)
+	         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      TopicSession session = getTopicSession();
+	      if (trace)
+	         log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
+	      TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+	      result = new JmsTopicSubscriber(result, this);
+	      if (trace)
+	         log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
    {
-      if(info.getType() == JmsConnectionFactory.QUEUE)
-      {
-         throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");         
-      }
-      
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      if(info.getType() == JmsConnectionFactory.QUEUE)
+	      {
+	         throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");         
+	      }
+	      
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
+	      TopicSubscriber result = session.createDurableSubscriber(topic, name);
+	      result = new JmsTopicSubscriber(result, this);
+	      if (trace)
+	         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
-      TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
-      result = new JmsTopicSubscriber(result, this);
-      if (trace)
-         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
+	      TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+	      result = new JmsTopicSubscriber(result, this);
+	      if (trace)
+	         log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TopicPublisher createPublisher(Topic topic) throws JMSException
    {
-      TopicSession session = getTopicSession();
-      if (trace)
-         log.trace("createPublisher " + session + " topic=" + topic);
-      TopicPublisher result = session.createPublisher(topic);
-      if (trace)
-         log.trace("createdPublisher " + session + " publisher=" + result);
-      addProducer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      TopicSession session = getTopicSession();
+	      if (trace)
+	         log.trace("createPublisher " + session + " topic=" + topic);
+	      TopicPublisher result = session.createPublisher(topic);
+	      result = new JmsTopicPublisher(result, this);
+	      if (trace)
+	         log.trace("createdPublisher " + session + " publisher=" + result);
+	      addProducer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -363,14 +455,22 @@
          throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");         
       }
       
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryTopic " + session);
-      TemporaryTopic temp = session.createTemporaryTopic();
-      if (trace)
-         log.trace("createdTemporaryTopic " + session + " temp=" + temp);
-      sf.addTemporaryTopic(temp);
-      return temp;
+      lock();
+      try
+      {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createTemporaryTopic " + session);
+	      TemporaryTopic temp = session.createTemporaryTopic();
+	      if (trace)
+	         log.trace("createdTemporaryTopic " + session + " temp=" + temp);
+	      sf.addTemporaryTopic(temp);
+	      return temp;
+      }
+      finally
+      {
+    	  unlock();
+      }
    }
 
    public void unsubscribe(String name) throws JMSException
@@ -379,11 +479,19 @@
       {
          throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");         
       }
-
-      Session session = getSession();
-      if (trace)
-         log.trace("unsubscribe " + session + " name=" + name);
-      session.unsubscribe(name);
+      
+      lock();
+      try
+      {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("unsubscribe " + session + " name=" + name);
+	      session.unsubscribe(name);
+      }
+      finally
+      {
+    	  unlock();
+      }
    }
 
    //--- QueueSession API
@@ -436,40 +544,65 @@
 
    public QueueReceiver createReceiver(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue);
-      QueueReceiver result = session.createReceiver(queue);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      QueueSession session = getQueueSession();
+	      if (trace)
+	         log.trace("createReceiver " + session + " queue=" + queue);
+	      QueueReceiver result = session.createReceiver(queue);
+	      result = new JmsQueueReceiver(result, this);
+	      if (trace)
+	         log.trace("createdReceiver " + session + " receiver=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
-      QueueReceiver result = session.createReceiver(queue, messageSelector);
-      result = new JmsQueueReceiver(result, this);
-      if (trace)
-         log.trace("createdReceiver " + session + " receiver=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      QueueSession session = getQueueSession();
+	      if (trace)
+	         log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
+	      QueueReceiver result = session.createReceiver(queue, messageSelector);
+	      result = new JmsQueueReceiver(result, this);
+	      if (trace)
+	         log.trace("createdReceiver " + session + " receiver=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public QueueSender createSender(Queue queue) throws JMSException
    {
-      QueueSession session = getQueueSession();
-      if (trace)
-         log.trace("createSender " + session + " queue=" + queue);
-      QueueSender result = session.createSender(queue);
-      if (trace)
-         log.trace("createdSender " + session + " sender=" + result);
-      addProducer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      QueueSession session = getQueueSession();
+	      if (trace)
+	         log.trace("createSender " + session + " queue=" + queue);
+	      QueueSender result = session.createSender(queue);
+	      result = new JmsQueueSender(result, this);
+	      if (trace)
+	         log.trace("createdSender " + session + " sender=" + result);
+	      addProducer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -479,68 +612,109 @@
          throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
          
       }
-      Session session = getSession();
-      if (trace)
-         log.trace("createTemporaryQueue " + session);
-      TemporaryQueue temp = session.createTemporaryQueue();
-      if (trace)
-         log.trace("createdTemporaryQueue " + session + " temp=" + temp);
-      sf.addTemporaryQueue(temp);
-      return temp;
+      lock();
+      try
+      {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createTemporaryQueue " + session);
+	      TemporaryQueue temp = session.createTemporaryQueue();
+	      if (trace)
+	         log.trace("createdTemporaryQueue " + session + " temp=" + temp);
+	      sf.addTemporaryQueue(temp);
+	      return temp;
+      }
+      finally
+      {
+    	  unlock();
+      }
    }
 
    // -- JMS 1.1
 
    public MessageConsumer createConsumer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination);
-      MessageConsumer result = session.createConsumer(destination);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createConsumer " + session + " dest=" + destination);
+	      MessageConsumer result = session.createConsumer(destination);
+	      result = new JmsMessageConsumer(result, this);
+	      if (trace)
+	         log.trace("createdConsumer " + session + " consumer=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
-      MessageConsumer result = session.createConsumer(destination, messageSelector);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
+	      MessageConsumer result = session.createConsumer(destination, messageSelector);
+	      result = new JmsMessageConsumer(result, this);
+	      if (trace)
+	         log.trace("createdConsumer " + session + " consumer=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
          throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
-      MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
-      result = new JmsMessageConsumer(result, this);
-      if (trace)
-         log.trace("createdConsumer " + session + " consumer=" + result);
-      addConsumer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
+	      MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+	      result = new JmsMessageConsumer(result, this);
+	      if (trace)
+	         log.trace("createdConsumer " + session + " consumer=" + result);
+	      addConsumer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public MessageProducer createProducer(Destination destination) throws JMSException
    {
-      Session session = getSession();
-      if (trace)
-         log.trace("createProducer " + session + " dest=" + destination);
-      MessageProducer result = getSession().createProducer(destination);
-      if (trace)
-         log.trace("createdProducer " + session + " producer=" + result);
-      addProducer(result);
-      return result;
+	  lock();
+	  try
+	  {
+	      Session session = getSession();
+	      if (trace)
+	         log.trace("createProducer " + session + " dest=" + destination);
+	      MessageProducer result = getSession().createProducer(destination);
+	      result = new JmsMessageProducer(result, this);
+	      if (trace)
+	         log.trace("createdProducer " + session + " producer=" + result);
+	      addProducer(result);
+	      return result;
+	  }
+	  finally
+	  {
+		  unlock();
+	  }
    }
 
    public int getAcknowledgeMode() throws JMSException
@@ -617,10 +791,10 @@
          {
             for (Iterator i = producers.iterator(); i.hasNext();)
             {
-               MessageProducer producer = (MessageProducer) i.next();
+               JmsMessageProducer producer = (JmsMessageProducer) i.next();
                try
                {
-                  producer.close();
+                  producer.closeProducer();
                }
                catch (Throwable t)
                {




More information about the jboss-cvs-commits mailing list