[jboss-cvs] JBossAS SVN: r68306 - branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 14 14:08:43 EST 2007
Author: adrian at jboss.org
Date: 2007-12-14 14:08:43 -0500 (Fri, 14 Dec 2007)
New Revision: 68306
Added:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
Modified:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java
Log:
[JBAS-5084] - Throw a javax.jms.IllegalStateException for jta jms activity in a failed transaction
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -21,6 +21,8 @@
*/
package org.jboss.resource.adapter.jms;
+import java.sql.SQLException;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
@@ -31,6 +33,8 @@
import javax.resource.spi.ManagedConnectionFactory;
import org.jboss.logging.Logger;
+import org.jboss.resource.connectionmanager.JTATransactionChecker;
+import org.jboss.util.NestedSQLException;
/**
* The the connection factory implementation for the JMS RA.
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -80,15 +80,22 @@
session.removeConsumer(this);
}
}
+
+ void checkState() throws JMSException
+ {
+ session.checkTransactionActive();
+ }
public MessageListener getMessageListener() throws JMSException
{
+ checkState();
session.checkStrict();
return consumer.getMessageListener();
}
public String getMessageSelector() throws JMSException
{
+ checkState();
return consumer.getMessageSelector();
}
@@ -96,6 +103,7 @@
{
if (trace)
log.trace("receive " + this);
+ checkState();
Message message = consumer.receive();
if (trace)
log.trace("received " + this + " result=" + message);
@@ -109,6 +117,7 @@
{
if (trace)
log.trace("receive " + this + " timeout=" + timeout);
+ checkState();
Message message = consumer.receive(timeout);
if (trace)
log.trace("received " + this + " result=" + message);
@@ -122,6 +131,7 @@
{
if (trace)
log.trace("receiveNoWait " + this);
+ checkState();
Message message = consumer.receiveNoWait();
if (trace)
log.trace("received " + this + " result=" + message);
@@ -133,6 +143,7 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
+ checkState();
session.checkStrict();
if (listener == null)
consumer.setMessageListener(null);
Added: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java (rev 0)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsMessageProducer.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -0,0 +1,184 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsMessageProducer.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsMessageProducer implements MessageProducer
+{
+ private static final Logger log = Logger.getLogger(JmsMessageConsumer.class);
+
+ /** The wrapped message producer */
+ MessageProducer producer;
+
+ /** The session for this consumer */
+ JmsSession session;
+
+ /** Whether trace is enabled */
+ private boolean trace = log.isTraceEnabled();
+
+ /**
+ * Create a new wrapper
+ *
+ * @param producer the producer
+ * @param session the session
+ */
+ public JmsMessageProducer(MessageProducer producer, JmsSession session)
+ {
+ this.producer = producer;
+ this.session = session;
+
+ if (trace)
+ log.trace("new JmsMessageProducer " + this + " producer=" + producer + " session=" + session);
+ }
+
+ public void close() throws JMSException
+ {
+ if (trace)
+ log.trace("close " + this);
+ try
+ {
+ closeProducer();
+ }
+ finally
+ {
+ session.removeProducer(this);
+ }
+ }
+
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void send(Destination destination, Message message) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ producer.send(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message);
+ checkState();
+ producer.send(message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public int getDeliveryMode() throws JMSException
+ {
+ return producer.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException
+ {
+ return producer.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException
+ {
+ return producer.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ return producer.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException
+ {
+ return producer.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException
+ {
+ return producer.getTimeToLive();
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ producer.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean value) throws JMSException
+ {
+ producer.setDisableMessageID(value);
+ }
+
+ public void setDisableMessageTimestamp(boolean value) throws JMSException
+ {
+ producer.setDisableMessageTimestamp(value);
+ }
+
+ public void setPriority(int defaultPriority) throws JMSException
+ {
+ producer.setPriority(defaultPriority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ producer.setTimeToLive(timeToLive);
+ }
+
+ void checkState() throws JMSException
+ {
+ session.checkTransactionActive();
+ }
+
+ void closeProducer() throws JMSException
+ {
+ producer.close();
+ }
+}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -46,6 +46,7 @@
public Queue getQueue() throws JMSException
{
+ checkState();
return ((QueueReceiver) consumer).getQueue();
}
}
Added: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java (rev 0)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsQueueSender.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -0,0 +1,79 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsQueueSender.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsQueueSender extends JmsMessageProducer implements QueueSender
+{
+ private static final Logger log = Logger.getLogger(JmsQueueSender.class);
+
+ /** Whether trace is enabled */
+ private boolean trace = log.isTraceEnabled();
+
+ /**
+ * Create a new wrapper
+ *
+ * @param producer the producer
+ * @param session the session
+ */
+ public JmsQueueSender(QueueSender producer, JmsSession session)
+ {
+ super(producer, session);
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ return ((QueueSender) producer).getQueue();
+ }
+
+ public void send(Queue destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ producer.send(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void send(Queue destination, Message message) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ producer.send(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSession.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -73,7 +73,7 @@
private JmsConnectionRequestInfo info;
/** The session factory for this session */
- private JmsSessionFactory sf;
+ private JmsSessionFactoryImpl sf;
/** The message consumers */
private HashSet consumers = new HashSet();
@@ -97,7 +97,7 @@
log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info);
}
- public void setJmsSessionFactory(JmsSessionFactory sf)
+ public void setJmsSessionFactory(JmsSessionFactoryImpl sf)
{
this.sf = sf;
}
@@ -115,12 +115,20 @@
if (mc == null)
throw new IllegalStateException("The session is closed");
+ checkTransactionActive();
+
Session session = mc.getSession();
if (trace)
log.trace("getSession " + session + " for " + this);
return session;
}
+ void checkTransactionActive() throws IllegalStateException
+ {
+ if (sf != null)
+ sf.checkTransactionActive();
+ }
+
// ---- Session API
public BytesMessage createBytesMessage() throws JMSException
@@ -350,6 +358,7 @@
if (trace)
log.trace("createPublisher " + session + " topic=" + topic);
TopicPublisher result = session.createPublisher(topic);
+ result = new JmsTopicPublisher(result, this);
if (trace)
log.trace("createdPublisher " + session + " publisher=" + result);
addProducer(result);
@@ -466,6 +475,7 @@
if (trace)
log.trace("createSender " + session + " queue=" + queue);
QueueSender result = session.createSender(queue);
+ result = new JmsQueueSender(result, this);
if (trace)
log.trace("createdSender " + session + " sender=" + result);
addProducer(result);
@@ -537,6 +547,7 @@
if (trace)
log.trace("createProducer " + session + " dest=" + destination);
MessageProducer result = getSession().createProducer(destination);
+ result = new JmsMessageProducer(result, this);
if (trace)
log.trace("createdProducer " + session + " producer=" + result);
addProducer(result);
@@ -617,10 +628,10 @@
{
for (Iterator i = producers.iterator(); i.hasNext();)
{
- MessageProducer producer = (MessageProducer) i.next();
+ JmsMessageProducer producer = (JmsMessageProducer) i.next();
try
{
- producer.close();
+ producer.closeProducer();
}
catch (Throwable t)
{
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -45,6 +45,7 @@
import javax.resource.spi.ManagedConnectionFactory;
import org.jboss.logging.Logger;
+import org.jboss.resource.connectionmanager.JTATransactionChecker;
/**
* Implements the JMS Connection API and produces {@link JmsSession} objects.
@@ -387,16 +388,33 @@
if (trace)
log.trace("Allocating session for " + this + " with request info=" + info);
JmsSession session = (JmsSession) cm.allocateConnection(mcf, info);
- if (trace)
- log.trace("Allocated " + this + " session=" + session);
- session.setJmsSessionFactory(this);
- if (started)
- session.start();
- sessions.add(session);
- return session;
+ try
+ {
+ if (trace)
+ log.trace("Allocated " + this + " session=" + session);
+ session.setJmsSessionFactory(this);
+ if (started)
+ session.start();
+ sessions.add(session);
+ return session;
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ if (t instanceof Exception)
+ throw (Exception) t;
+ else
+ throw new RuntimeException("Unexpected error: ", t);
+ }
}
}
- catch (ResourceException e)
+ catch (Exception e)
{
log.error("could not create session", e);
@@ -411,5 +429,29 @@
{
if (closed)
throw new IllegalStateException("The connection is closed");
+ checkTransactionActive();
}
+
+ /**
+ * Check whether a tranasction is active
+ *
+ * @throws IllegalStateException if the transaction is not active, preparing, prepared or committing or for any error in the transaction manager
+ */
+ protected void checkTransactionActive() throws IllegalStateException
+ {
+ if (cm == null)
+ throw new IllegalStateException("No connection manager");
+ try
+ {
+ if (cm instanceof JTATransactionChecker)
+ ((JTATransactionChecker) cm).checkTransactionActive();
+ }
+ catch (Exception e)
+ {
+ IllegalStateException ex = new IllegalStateException("Transaction not active");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
}
Added: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java (rev 0)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicPublisher.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -0,0 +1,100 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.resource.adapter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JmsQueueSender.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.1 $
+ */
+public class JmsTopicPublisher extends JmsMessageProducer implements TopicPublisher
+{
+ private static final Logger log = Logger.getLogger(JmsTopicPublisher.class);
+
+ /** Whether trace is enabled */
+ private boolean trace = log.isTraceEnabled();
+
+ /**
+ * Create a new wrapper
+ *
+ * @param producer the producer
+ * @param session the session
+ */
+ public JmsTopicPublisher(TopicPublisher producer, JmsSession session)
+ {
+ super(producer, session);
+ }
+
+ public Topic getTopic() throws JMSException
+ {
+ return ((TopicPublisher) producer).getTopic();
+ }
+
+ public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ ((TopicPublisher) producer).publish(message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void publish(Message message) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " message=" + message);
+ checkState();
+ ((TopicPublisher) producer).publish(message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void publish(Topic destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message + " deliveryMode=" + deliveryMode + " priority=" + priority + " ttl=" + timeToLive);
+ checkState();
+ ((TopicPublisher) producer).publish(destination, message, deliveryMode, priority, timeToLive);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+
+ public void publish(Topic destination, Message message) throws JMSException
+ {
+ if (trace)
+ log.trace("send " + this + " destination=" + destination + " message=" + message);
+ checkState();
+ ((TopicPublisher) producer).publish(destination, message);
+ if (trace)
+ log.trace("sent " + this + " result=" + message);
+ }
+}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java 2007-12-14 19:07:09 UTC (rev 68305)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java 2007-12-14 19:08:43 UTC (rev 68306)
@@ -46,11 +46,13 @@
public boolean getNoLocal() throws JMSException
{
+ checkState();
return ((TopicSubscriber) consumer).getNoLocal();
}
public Topic getTopic() throws JMSException
{
+ checkState();
return ((TopicSubscriber) consumer).getTopic();
}
}
More information about the jboss-cvs-commits
mailing list