[jboss-cvs] JBossAS SVN: r71467 - in branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter: jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 31 05:23:50 EDT 2008
Author: vicky.kak at jboss.com
Date: 2008-03-31 05:23:50 -0400 (Mon, 31 Mar 2008)
New Revision: 71467
Added:
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
Modified:
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
Log:
[JBPAPP-662]Add a lock for the session to avoid racing between jms activity and asynchronous rollback
Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java 2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jdbc/BaseWrapperManagedConnection.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -34,6 +34,8 @@
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
@@ -66,6 +68,8 @@
private final int transactionIsolation;
private final boolean readOnly;
+ private ReentrantLock lock = new ReentrantLock();
+
private final Collection cels = new ArrayList();
private final Set handles = new HashSet();
private PreparedStatementCache psCache = null;
@@ -194,6 +198,11 @@
mcf.log.warn("Error resetting transaction isolation ", e);
}
}
+ // I'm recreating the lock object when we return to the pool
+ // because it looks too nasty to expect the connection handle
+ // to unlock properly in certain race conditions
+ // where the dissociation of the managed connection is "random".
+ lock = new ReentrantLock();
}
}
@@ -241,6 +250,35 @@
}
}
+ protected void lock()
+ {
+ lock.lock();
+ }
+
+ protected void tryLock() throws SQLException
+ {
+ int tryLock = mcf.getUseTryLock();
+ if (tryLock <= 0)
+ {
+ lock();
+ return;
+ }
+ try
+ {
+ if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+ throw new SQLException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+ }
+ catch (InterruptedException e)
+ {
+ throw new SQLException("Interrupted attempting lock: " + this);
+ }
+ }
+
+ protected void unlock()
+ {
+ lock.unlock();
+ }
+
void closeHandle(WrappedConnection handle)
{
synchronized (stateLock)
Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java 2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -27,12 +27,15 @@
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
@@ -141,6 +144,8 @@
private String pwd;
private boolean isDestroyed;
+ private ReentrantLock lock = new ReentrantLock();
+
// Physical JMS connection stuff
private Connection con;
private Session session;
@@ -332,6 +337,13 @@
// destory handles
destroyHandles();
+
+ // I'm recreating the lock object when we return to the pool
+ // because it looks too nasty to expect the connection handle
+ // to unlock properly in certain race conditions
+ // where the dissociation of the managed connection is "random".
+ lock = new ReentrantLock();
+
}
/**
@@ -360,6 +372,35 @@
("ManagedConnection in an illegal state");
}
+ protected void lock()
+ {
+ lock.lock();
+ }
+
+ protected void tryLock() throws JMSException
+ {
+ int tryLock = mcf.getUseTryLock();
+ if (tryLock <= 0)
+ {
+ lock();
+ return;
+ }
+ try
+ {
+ if (lock.tryLock(tryLock, TimeUnit.SECONDS) == false)
+ throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+ }
+ }
+
+ protected void unlock()
+ {
+ lock.unlock();
+ }
+
/**
* Add a connection event listener.
*
@@ -412,6 +453,7 @@
if (log.isTraceEnabled())
log.trace("XAResource=" + xaResource);
+ xaResource = new JmsXAResource(this, xaResource);
return xaResource;
}
Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -57,6 +57,9 @@
/** For local access. */
private JMSProviderAdapter adapter;
+
+ /** The try lock */
+ private int useTryLock = 0;
public JmsManagedConnectionFactory()
{
@@ -314,6 +317,26 @@
return adapter;
}
+ /**
+ * Get the useTryLock.
+ *
+ * @return the useTryLock.
+ */
+ public int getUseTryLock()
+ {
+ return useTryLock;
+ }
+
+ /**
+ * Set the useTryLock.
+ *
+ * @param useTryLock the useTryLock.
+ */
+ public void setUseTryLock(int useTryLock)
+ {
+ this.useTryLock = useTryLock;
+ }
+
private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
{
if (info == null)
Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -81,63 +81,106 @@
}
}
+ void checkState() throws JMSException
+ {
+ session.checkTransactionActive();
+ }
+
public MessageListener getMessageListener() throws JMSException
{
+ checkState();
session.checkStrict();
return consumer.getMessageListener();
}
public String getMessageSelector() throws JMSException
{
+ checkState();
return consumer.getMessageSelector();
}
public Message receive() throws JMSException
{
- if (trace)
- log.trace("receive " + this);
- Message message = consumer.receive();
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receive " + this);
+ checkState();
+ Message message = consumer.receive();
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public Message receive(long timeout) throws JMSException
{
- if (trace)
- log.trace("receive " + this + " timeout=" + timeout);
- Message message = consumer.receive(timeout);
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receive " + this + " timeout=" + timeout);
+ checkState();
+ Message message = consumer.receive(timeout);
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public Message receiveNoWait() throws JMSException
{
- if (trace)
- log.trace("receiveNoWait " + this);
- Message message = consumer.receiveNoWait();
- if (trace)
- log.trace("received " + this + " result=" + message);
- if (message == null)
- return null;
- else
- return wrapMessage(message);
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("receiveNoWait " + this);
+ checkState();
+ Message message = consumer.receiveNoWait();
+ if (trace)
+ log.trace("received " + this + " result=" + message);
+ if (message == null)
+ return null;
+ else
+ return wrapMessage(message);
+ }
+ finally
+ {
+ session.unlock();
+ }
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- session.checkStrict();
- if (listener == null)
- consumer.setMessageListener(null);
- else
- consumer.setMessageListener(wrapMessageListener(listener));
+ session.lock();
+ try
+ {
+ checkState();
+ session.checkStrict();
+ if (listener == null)
+ consumer.setMessageListener(null);
+ else
+ consumer.setMessageListener(wrapMessageListener(listener));
+ }
+ finally
+ {
+ session.unlock();
+ }
}
void closeConsumer() throws JMSException
Added: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -0,0 +1,216 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsMessageProducer.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsMessageProducer implements MessageProducer
+{
+ private static final Logger log = Logger.getLogger(JmsMessageConsumer.class);
+
+ /** The wrapped message producer */
+ MessageProducer producer;
+
+ /** The session for this consumer */
+ JmsSession session;
+
+ /** Whether trace is enabled */
+ private boolean trace = log.isTraceEnabled();
+
+ /**
+ * Create a new wrapper
+ *
+ * @param producer the producer
+ * @param session the session
+ */
+ public JmsMessageProducer(MessageProducer producer, JmsSession session)
+ {
+ this.producer = producer;
+ this.session = session;
+
+ if (trace)
+ log.trace("new JmsMessageProducer " + this + " producer=" + producer + " session=" + session);
+ }
+
+ public void close() throws JMSException
+ {
+ if (trace)
+ log.trace("close " + this);
+ try
+ {
+ closeProducer();
+ }
+ finally
+ {
+ session.removeProducer(this);
+ }
+ }
+
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
+ }
+
+ public void send(Destination destination, Message message) throws JMSException
+ {
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ producer.send(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
+ }
+
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ session.lock();
+ try
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message);
+ checkState();
+ producer.send(message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+ finally
+ {
+ session.unlock();
+ }
+ }
+
+ public int getDeliveryMode() throws JMSException
+ {
+ return producer.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException
+ {
+ return producer.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException
+ {
+ return producer.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ return producer.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException
+ {
+ return producer.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException
+ {
+ return producer.getTimeToLive();
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ producer.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean value) throws JMSException
+ {
+ producer.setDisableMessageID(value);
+ }
+
+ public void setDisableMessageTimestamp(boolean value) throws JMSException
+ {
+ producer.setDisableMessageTimestamp(value);
+ }
+
+ public void setPriority(int defaultPriority) throws JMSException
+ {
+ producer.setPriority(defaultPriority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ producer.setTimeToLive(timeToLive);
+ }
+
+ void checkState() throws JMSException
+ {
+ session.checkTransactionActive();
+ }
+
+ void closeProducer() throws JMSException
+ {
+ producer.close();
+ }
+}
Modified: branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2008-03-31 09:22:02 UTC (rev 71466)
+++ branches/JBPAPP_4_2_0_GA_CP/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2008-03-31 09:23:50 UTC (rev 71467)
@@ -41,6 +41,7 @@
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
+import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
@@ -73,7 +74,7 @@
private JmsConnectionRequestInfo info;
/** The session factory for this session */
- private JmsSessionFactory sf;
+ private JmsSessionFactoryImpl sf;
/** The message consumers */
private HashSet consumers = new HashSet();
@@ -97,11 +98,29 @@
log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info);
}
- public void setJmsSessionFactory(JmsSessionFactory sf)
+ public void setJmsSessionFactory(JmsSessionFactoryImpl sf)
{
this.sf = sf;
}
+ protected void lock() throws JMSException
+ {
+ JmsManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.tryLock();
+ else
+ throw new ResourceAllocationException("Connection is not associated with a managed connection. " + this);
+ }
+
+ protected void unlock()
+ {
+ JmsManagedConnection mc = this.mc;
+ if (mc != null)
+ mc.unlock();
+ // We recreate the lock when returned to the pool
+ // so missing the unlock after disassociation is not important
+ }
+
/**
* Ensure that the session is opened.
*
@@ -115,12 +134,20 @@
if (mc == null)
throw new IllegalStateException("The session is closed");
+ checkTransactionActive();
+
Session session = mc.getSession();
if (trace)
log.trace("getSession " + session + " for " + this);
return session;
}
+ void checkTransactionActive() throws IllegalStateException
+ {
+ if (sf != null)
+ sf.checkTransactionActive();
+ }
+
// ---- Session API
public BytesMessage createBytesMessage() throws JMSException
@@ -240,32 +267,56 @@
// FIXME - is this really OK, probably not
public void commit() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted() == false)
- throw new IllegalStateException("Session is not transacted");
- if (trace)
- log.trace("Commit session " + this);
- session.commit();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted() == false)
+ throw new IllegalStateException("Session is not transacted");
+ if (trace)
+ log.trace("Commit session " + this);
+ session.commit();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void rollback() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted() == false)
- throw new IllegalStateException("Session is not transacted");
- if (trace)
- log.trace("Rollback session " + this);
- session.rollback();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted() == false)
+ throw new IllegalStateException("Session is not transacted");
+ if (trace)
+ log.trace("Rollback session " + this);
+ session.rollback();
+ }
+ finally
+ {
+ unlock();
+ }
}
public void recover() throws JMSException
{
- Session session = getSession();
- if (info.isTransacted())
- throw new IllegalStateException("Session is transacted");
- if (trace)
- log.trace("Recover session " + this);
- session.recover();
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (info.isTransacted())
+ throw new IllegalStateException("Session is transacted");
+ if (trace)
+ log.trace("Recover session " + this);
+ session.recover();
+ }
+ finally
+ {
+ unlock();
+ }
}
// --- TopicSession API
@@ -288,72 +339,113 @@
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createSubscriber " + session + " topic=" + topic);
- TopicSubscriber result = session.createSubscriber(topic);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createSubscriber " + session + " topic=" + topic);
+ TopicSubscriber result = session.createSubscriber(topic);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
- TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
+ TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- if(info.getType() == JmsConnectionFactory.QUEUE)
- {
- throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
- }
-
- Session session = getSession();
- if (trace)
- log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
- TopicSubscriber result = session.createDurableSubscriber(topic, name);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ if(info.getType() == JmsConnectionFactory.QUEUE)
+ {
+ throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+ }
+
+ Session session = getSession();
+ if (trace)
+ log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
+ TopicSubscriber result = session.createDurableSubscriber(topic, name);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
- TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
- result = new JmsTopicSubscriber(result, this);
- if (trace)
- log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
+ TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ result = new JmsTopicSubscriber(result, this);
+ if (trace)
+ log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- TopicSession session = getTopicSession();
- if (trace)
- log.trace("createPublisher " + session + " topic=" + topic);
- TopicPublisher result = session.createPublisher(topic);
- if (trace)
- log.trace("createdPublisher " + session + " publisher=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ TopicSession session = getTopicSession();
+ if (trace)
+ log.trace("createPublisher " + session + " topic=" + topic);
+ TopicPublisher result = session.createPublisher(topic);
+ result = new JmsTopicPublisher(result, this);
+ if (trace)
+ log.trace("createdPublisher " + session + " publisher=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -363,14 +455,22 @@
throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
}
- Session session = getSession();
- if (trace)
- log.trace("createTemporaryTopic " + session);
- TemporaryTopic temp = session.createTemporaryTopic();
- if (trace)
- log.trace("createdTemporaryTopic " + session + " temp=" + temp);
- sf.addTemporaryTopic(temp);
- return temp;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createTemporaryTopic " + session);
+ TemporaryTopic temp = session.createTemporaryTopic();
+ if (trace)
+ log.trace("createdTemporaryTopic " + session + " temp=" + temp);
+ sf.addTemporaryTopic(temp);
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
}
public void unsubscribe(String name) throws JMSException
@@ -379,11 +479,19 @@
{
throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
}
-
- Session session = getSession();
- if (trace)
- log.trace("unsubscribe " + session + " name=" + name);
- session.unsubscribe(name);
+
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("unsubscribe " + session + " name=" + name);
+ session.unsubscribe(name);
+ }
+ finally
+ {
+ unlock();
+ }
}
//--- QueueSession API
@@ -436,40 +544,65 @@
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createReceiver " + session + " queue=" + queue);
- QueueReceiver result = session.createReceiver(queue);
- result = new JmsQueueReceiver(result, this);
- if (trace)
- log.trace("createdReceiver " + session + " receiver=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createReceiver " + session + " queue=" + queue);
+ QueueReceiver result = session.createReceiver(queue);
+ result = new JmsQueueReceiver(result, this);
+ if (trace)
+ log.trace("createdReceiver " + session + " receiver=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
- QueueReceiver result = session.createReceiver(queue, messageSelector);
- result = new JmsQueueReceiver(result, this);
- if (trace)
- log.trace("createdReceiver " + session + " receiver=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
+ QueueReceiver result = session.createReceiver(queue, messageSelector);
+ result = new JmsQueueReceiver(result, this);
+ if (trace)
+ log.trace("createdReceiver " + session + " receiver=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public QueueSender createSender(Queue queue) throws JMSException
{
- QueueSession session = getQueueSession();
- if (trace)
- log.trace("createSender " + session + " queue=" + queue);
- QueueSender result = session.createSender(queue);
- if (trace)
- log.trace("createdSender " + session + " sender=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ QueueSession session = getQueueSession();
+ if (trace)
+ log.trace("createSender " + session + " queue=" + queue);
+ QueueSender result = session.createSender(queue);
+ result = new JmsQueueSender(result, this);
+ if (trace)
+ log.trace("createdSender " + session + " sender=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -479,68 +612,109 @@
throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
}
- Session session = getSession();
- if (trace)
- log.trace("createTemporaryQueue " + session);
- TemporaryQueue temp = session.createTemporaryQueue();
- if (trace)
- log.trace("createdTemporaryQueue " + session + " temp=" + temp);
- sf.addTemporaryQueue(temp);
- return temp;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createTemporaryQueue " + session);
+ TemporaryQueue temp = session.createTemporaryQueue();
+ if (trace)
+ log.trace("createdTemporaryQueue " + session + " temp=" + temp);
+ sf.addTemporaryQueue(temp);
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
}
// -- JMS 1.1
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination);
- MessageConsumer result = session.createConsumer(destination);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination);
+ MessageConsumer result = session.createConsumer(destination);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
- MessageConsumer result = session.createConsumer(destination, messageSelector);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
+ MessageConsumer result = session.createConsumer(destination, messageSelector);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
- MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
- result = new JmsMessageConsumer(result, this);
- if (trace)
- log.trace("createdConsumer " + session + " consumer=" + result);
- addConsumer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
+ MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+ result = new JmsMessageConsumer(result, this);
+ if (trace)
+ log.trace("createdConsumer " + session + " consumer=" + result);
+ addConsumer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public MessageProducer createProducer(Destination destination) throws JMSException
{
- Session session = getSession();
- if (trace)
- log.trace("createProducer " + session + " dest=" + destination);
- MessageProducer result = getSession().createProducer(destination);
- if (trace)
- log.trace("createdProducer " + session + " producer=" + result);
- addProducer(result);
- return result;
+ lock();
+ try
+ {
+ Session session = getSession();
+ if (trace)
+ log.trace("createProducer " + session + " dest=" + destination);
+ MessageProducer result = getSession().createProducer(destination);
+ result = new JmsMessageProducer(result, this);
+ if (trace)
+ log.trace("createdProducer " + session + " producer=" + result);
+ addProducer(result);
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
}
public int getAcknowledgeMode() throws JMSException
@@ -617,10 +791,10 @@
{
for (Iterator i = producers.iterator(); i.hasNext();)
{
- MessageProducer producer = (MessageProducer) i.next();
+ JmsMessageProducer producer = (JmsMessageProducer) i.next();
try
{
- producer.close();
+ producer.closeProducer();
}
catch (Throwable t)
{
More information about the jboss-cvs-commits
mailing list