[jboss-svn-commits] JBL Code SVN: r26301 - in labs/jbossesb/workspace: blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Apr 29 06:15:00 EDT 2009
Author: kevin.conner at jboss.com
Date: 2009-04-29 06:14:59 -0400 (Wed, 29 Apr 2009)
New Revision: 26301
Added:
labs/jbossesb/workspace/blixen/
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
Modified:
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
Log:
Added for blixen
Copied: labs/jbossesb/workspace/blixen (from rev 23452, labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA)
Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -23,12 +23,12 @@
package org.jboss.internal.soa.esb.couriers;
import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -41,6 +41,7 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.helpers.JmsComposer;
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionFailureException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
@@ -176,9 +177,30 @@
* if problems were encountered
*/
public boolean deliver(org.jboss.soa.esb.message.Message message) throws CourierException {
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+ }
+ }
+
+ private boolean internalDeliver(org.jboss.soa.esb.message.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
ObjectMessage msg;
if (null == message) {
+ _logger.warn("KEV: null message sent into ESB deliver") ;
return false;
}
@@ -206,11 +228,15 @@
// Set the JMS message from the ESB message...
try {
setJMSProperties(message, msg);
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ } catch (final IllegalStateException ise) {
+ throw ise ;
} catch (JMSException e) {
throw new CourierMarshalUnmarshalException("Failed to set JMS Message properties from ESB Message properties.", e);
}
- return deliver(msg);
+ return internalDeliver(msg);
}
/**
@@ -222,11 +248,32 @@
* if problems were encountered
*/
public boolean deliver(javax.jms.Message message) throws CourierException {
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+ }
+ }
+
+ private boolean internalDeliver(javax.jms.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
if (_isReceiver) {
throw new CourierException("This is a read-only Courier");
}
if (null == message) {
+ _logger.warn("KEV: null message sent into JMS deliver") ;
return false;
}
synchronized(this) {
@@ -253,6 +300,12 @@
return true;
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException e) {
if (!jmsConnectRetry(e))
throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
@@ -312,7 +365,7 @@
return true;
} // ________________________________
- private void createMessageProducer() throws CourierException, NamingContextException {
+ private void createMessageProducer() throws CourierException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
synchronized(this) {
if (_messageProducer == null) {
try {
@@ -354,6 +407,12 @@
}
}
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException ex) {
_logger.debug("Error from JMS system.", ex);
@@ -389,6 +448,26 @@
}
public javax.jms.Message pickupPayload(long millis) throws CourierException, CourierTimeoutException {
+ try {
+ return internalPickupPayload(millis) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalPickupPayload(millis) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during receive and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during receive and could not reconnect! ", ise);
+ }
+ }
+
+ private javax.jms.Message internalPickupPayload(long millis) throws CourierException, CourierTimeoutException, JmsConnectionFailureException, IllegalStateException {
if (!_isReceiver)
throw new CourierException("This is an outgoing-only Courier");
if (millis < 1)
@@ -414,6 +493,12 @@
jmsMessage = _messageConsumer.receive(millis);
break;
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException e) {
if (!jmsConnectRetry(e))
throw new CourierTransportException("Caught exception during receive and could not reconnect! ",e);
@@ -467,7 +552,7 @@
esbPropertiesStrategy.setPropertiesFromJMSMessage(fromJMS, toESB);
}
- private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
+ private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
Context oJndiCtx = null;
synchronized(this) {
@@ -513,6 +598,12 @@
NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException ex) {
_logger.debug("Error from JMS system.", ex);
Added: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
===================================================================
--- labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java (rev 0)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -0,0 +1,59 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.jms.JMSException;
+
+/**
+ * This exception is thrown from a JMS session/producer/consumer method when the
+ * underlying exception appears to have been closed. This will occur when the connection
+ * has not been closed through the standard JMS exception listener route.
+ *
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionFailureException extends JMSException
+{
+ /**
+ * The serial version UID of this exception
+ */
+ private static final long serialVersionUID = 4352490530798177236L;
+
+ /**
+ * Construct an jms connection failure exception with the specified message.
+ * @param message The exception message.
+ */
+ public JmsConnectionFailureException(final String message)
+ {
+ super(message) ;
+ }
+
+ /**
+ * Construct an jms connection failure exception with the specified message and associated cause.
+ * @param message The exception message.
+ * @param cause The associated cause.
+ */
+ public JmsConnectionFailureException(String message, Throwable cause)
+ {
+ this(message) ;
+ initCause(cause) ;
+ }
+}
Property changes on: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -161,15 +161,17 @@
private synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
throws JMSException
{
+ final long currentID = id ;
+ final Connection currentConnection = jmsConnection ;
final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
public JmsSession call()
throws JMSException
{
final JmsSession session ;
if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)currentConnection).createXASession(), currentID);
} else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+ session = new JmsSession(JmsConnectionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID);
}
return session ;
}
@@ -210,6 +212,24 @@
*/
public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
{
+ try
+ {
+ return internalGetSession(acknowledgeMode) ;
+ }
+ catch (final JMSException jmse)
+ {
+ if (messagingConnectionFailure(jmse))
+ {
+ cleanSessionPool() ;
+ return internalGetSession(acknowledgeMode) ;
+ }
+ throw jmse ;
+ }
+ }
+
+ private synchronized JmsSession internalGetSession(final int acknowledgeMode)
+ throws NamingException, JMSException, ConnectionException
+ {
try {
initConnection() ;
} catch (final NamingContextException nce) {
@@ -309,23 +329,36 @@
*/
synchronized void handleCloseSession(final JmsSession session)
{
- if (session.getId() == id)
+ if (session.isSuspect())
{
- final int mode ;
- try {
- mode = session.getAcknowledgeMode() ;
- } catch (final JMSException jmse) {
- logger.warn("JMSException while calling getAcknowledgeMode") ;
- logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
- return ;
+ logger.debug("Session is suspect, dropping") ;
+ handleReleaseSession(session) ;
+ }
+ else
+ {
+ if (session.getId() != id)
+ {
+ logger.debug("Session is from a previous incarnation, dropping") ;
}
-
- final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
- if (sessions != null) {
- sessions.add(session) ;
+ else
+ {
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.add(session) ;
+ }
}
+ session.releaseResources() ;
+ releaseInUseSession(session) ;
}
- handleReleaseSession(session) ;
}
/**
@@ -334,7 +367,21 @@
*/
synchronized void handleReleaseSession(final JmsSession session)
{
- session.releaseResources() ;
+ session.releaseResources();
+ try
+ {
+ session.close() ;
+ }
+ catch (final Throwable th) {} // ignore
+ releaseInUseSession(session) ;
+ }
+
+ /**
+ * Release a session from the in use mapping.
+ * @param session The session to release.
+ */
+ private void releaseInUseSession(final JmsSession session)
+ {
final int mode ;
try {
mode = session.getAcknowledgeMode() ;
@@ -564,6 +611,43 @@
}
/**
+ * Check the exception to see whether it indicates a connection failure in JBM
+ * @param jmse The current JMS Exception
+ * @return true if it suggests a failure, false otherwise.
+ */
+ private boolean messagingConnectionFailure(final JMSException jmse)
+ {
+ /*
+ * JBoss Messaging can drop the connection from the server side
+ * without calling back on the exception listener. We check for
+ * IllegalStateException as this appears to be the indicator
+ * exception used by JBoss Messaging when the connection has disappeared.
+ */
+ Throwable cause = jmse ;
+ while(cause.getCause() != null)
+ {
+ cause = cause.getCause() ;
+ }
+ return (cause instanceof IllegalStateException) ;
+ }
+
+ /**
+ * Handle exceptions that occur in the session or its child objects.
+ * @param jmsSession The jmsSession associated with the call.
+ * @param jmse The JMSException to check.
+ * @throws JMSException if the exception is to be overridden.
+ */
+ void handleException(final JmsSession jmsSession, final JMSException jmse)
+ throws JMSException
+ {
+ if (messagingConnectionFailure(jmse))
+ {
+ cleanSessionPool() ;
+ throw new JmsConnectionFailureException("The underlying exception appears to have failed", jmse) ;
+ }
+ }
+
+ /**
* Associate the JMS XA Session with the current transaction.
* @param session The XA session.
* @throws ConnectionException if there is no transaction active.
Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -22,6 +22,10 @@
package org.jboss.internal.soa.esb.rosetta.pooling;
import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.HashSet;
import javax.jms.BytesMessage;
@@ -49,6 +53,10 @@
public class JmsSession implements Session
{
/**
+ * The connection pool.
+ */
+ private final JmsConnectionPool pool ;
+ /**
* The session delegate.
*/
private final Session session ;
@@ -56,6 +64,14 @@
* The pool instance id.
*/
private final long id ;
+ /**
+ * The session acknowledge mode.
+ */
+ private final int acknowledgeMode ;
+ /**
+ * Flag indicating whether this session is suspect or not.
+ */
+ private boolean suspect ;
/**
* The set of active queue browsers.
@@ -72,16 +88,19 @@
/**
* Create the session wrapper.
+ * @param pool The pool associated with this session.
* @param session The session delegate.
* @param id The pool instance id.
* @param isJTA True if this tales part in a JTA transaction
* @throws JMSException
*/
- JmsSession(final Session session, final long id)
+ JmsSession(final JmsConnectionPool pool, final Session session, final long id)
throws JMSException
{
this.id = id ;
- this.session = session ;
+ this.pool = pool ;
+ this.session = (Session)getExceptionHandler(pool, Session.class, session) ;
+ acknowledgeMode = session.getAcknowledgeMode() ;
// Workaround for JBESB-1873
if ("org.jboss.jms.client.JBossSession".equals(session.getClass().getName()))
{
@@ -101,7 +120,9 @@
public void commit() throws JMSException
{
+ setSuspect(true) ;
session.commit();
+ setSuspect(false) ;
}
public QueueBrowser createBrowser(Queue arg0, String arg1)
@@ -224,8 +245,7 @@
public int getAcknowledgeMode() throws JMSException
{
- associate() ;
- return session.getAcknowledgeMode();
+ return acknowledgeMode;
}
public MessageListener getMessageListener() throws JMSException
@@ -248,7 +268,9 @@
public void rollback() throws JMSException
{
+ setSuspect(true) ;
session.rollback();
+ setSuspect(false) ;
}
public void run()
@@ -278,7 +300,7 @@
}
final QueueBrowser result = getQueueBrowser(queueBrowser) ;
queueBrowserSet.add(result) ;
- return result ;
+ return (QueueBrowser)getExceptionHandler(pool, QueueBrowser.class, result) ;
}
private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
@@ -291,7 +313,7 @@
}
final MessageConsumer result = getMessageConsumer(messageConsumer) ;
messageConsumerSet.add(result) ;
- return result ;
+ return (MessageConsumer)getExceptionHandler(pool, MessageConsumer.class, result) ;
}
private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
@@ -304,7 +326,7 @@
}
final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
messageConsumerSet.add(result) ;
- return result ;
+ return (TopicSubscriber)getExceptionHandler(pool, TopicSubscriber.class, result) ;
}
private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
@@ -317,7 +339,7 @@
}
final MessageProducer result = getMessageProducer(messageProducer) ;
messageProducerSet.add(result) ;
- return result ;
+ return (MessageProducer)getExceptionHandler(pool, MessageProducer.class, result) ;
}
synchronized void releaseResources()
@@ -352,11 +374,6 @@
}
messageProducerSet = null ;
}
- try
- {
- recover() ;
- }
- catch (final JMSException jmse) {} // ignore
}
protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
@@ -393,4 +410,71 @@
throws JMSException
{
}
+
+ protected void setSuspect(final boolean suspect)
+ {
+ this.suspect = suspect ;
+ }
+
+ public boolean isSuspect()
+ {
+ return suspect ;
+ }
+
+ /**
+ * Wrap the object in an exception handler.
+ * @param pool The current connection pool.
+ * @param instanceClass The interface type of the instance.
+ * @param instance The instance
+ * @return
+ */
+ protected Object getExceptionHandler(final JmsConnectionPool pool,
+ final Class<?> instanceClass, final Object instance)
+ {
+ final InvocationHandler handler = new ExceptionHandler(pool, instance) ;
+ return Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {instanceClass}, handler);
+ }
+
+ /**
+ * Handler responsible for intercepting JMS exceptions and checking for unreported closure.
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+ private final class ExceptionHandler implements InvocationHandler
+ {
+ /**
+ * The connection pool.
+ */
+ private final JmsConnectionPool pool ;
+ /**
+ * The target instance.
+ */
+ private final Object target ;
+
+ /**
+ * Construct the handler using the specified target.
+ * @param target The target instance.
+ */
+ public ExceptionHandler(final JmsConnectionPool pool, final Object target)
+ {
+ this.pool = pool ;
+ this.target = target ;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ try
+ {
+ return method.invoke(target, args);
+ }
+ catch (final InvocationTargetException ite)
+ {
+ final Throwable th = ite.getCause() ;
+ if (th instanceof JMSException)
+ {
+ pool.handleException(JmsSession.this, (JMSException)th) ;
+ }
+ throw th ;
+ }
+ }
+ }
}
Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -32,6 +32,7 @@
import javax.jms.QueueBrowser;
import javax.jms.TopicSubscriber;
import javax.jms.XASession;
+import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.xa.XAResource;
@@ -78,11 +79,11 @@
JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
throws JMSException
{
- super(session, id) ;
+ super(pool, session, id) ;
this.pool = pool ;
- this.session = session ;
+ this.session = (XASession)getExceptionHandler(pool, XASession.class, session) ;
}
-
+
@Override
public void commit() throws JMSException
{
@@ -162,11 +163,12 @@
if (!associated)
{
cleanupAction = Cleanup.none ;
- final XAResource resource = session.getXAResource() ;
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
try
{
transactionStrategy.registerSynchronization(this) ;
+ setSuspect(true) ;
+ final XAResource resource = session.getXAResource() ;
transactionStrategy.enlistResource(resource) ;
}
catch (final TransactionStrategyException tse)
@@ -186,6 +188,7 @@
throw ex ;
}
+ setSuspect(false) ;
associated = true ;
}
}
@@ -200,8 +203,12 @@
switch (cleanupAction)
{
case close:
- pool.handleCloseSession(this) ;
- break ;
+ if (result == Status.STATUS_COMMITTED)
+ {
+ pool.handleCloseSession(this) ;
+ break ;
+ }
+ // fall through
case release:
pool.handleReleaseSession(this) ;
break ;
Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -571,6 +571,7 @@
} catch (FaultMessageException e) {
throw e;
} catch (final CourierServiceBindException e) {
+ logger.warn("KEV: Caught exception", e) ;
// meant to be masked by the SI fail-over
logger.debug("Caught service lookup exception for EPR [" + targetEPR + "] and Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());
@@ -581,6 +582,7 @@
throw new MessageDeliverException("Caught (un)marshal related exception during attempted send/receive.", e);
} catch (final CourierTransportException e) {
+ logger.warn("KEV: Caught exception", e) ;
// meant to be masked by the SI fail-over
logger.debug("Courier indicated transport related error "+e+" during send/receive with EPR [" + targetEPR + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());
Modified: labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java 2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java 2009-04-29 10:14:59 UTC (rev 26301)
@@ -23,19 +23,32 @@
import static org.junit.Assert.assertEquals;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Properties;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.naming.Context;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
import junit.framework.JUnit4TestAdapter;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
-import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockejb.jms.QueueConnectionFactoryImpl;
+import org.mockejb.jndi.MockContextFactory;
/**
* @author kstam
@@ -43,21 +56,42 @@
*
*/
public class JmsConnectionPoolingIntegrationTest {
-
- private Properties environment;
-
- @Before
- public void setup()
- {
- environment = getEnvironment();
- }
+ private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+
+ private static final String BROKEN_CONNECTION_FACTORY = "BrokenConnectionFactory" ;
+
+ private MockTransactionStrategy transactionStrategy = new MockTransactionStrategy() ;
+
+ @Before
+ public void setup() throws Exception
+ {
+ MockContextFactory.setAsInitial() ;
+ final Context ctx = NamingContextPool.getNamingContext(null);
+ try
+ {
+ ctx.rebind(CONNECTION_FACTORY, new MockQueueConnectionFactory());
+ }
+ finally
+ {
+ NamingContextPool.releaseNamingContext(ctx) ;
+ }
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+
+ @After
+ public void tearDown()
+ {
+ TransactionStrategy.setTransactionStrategy(null) ;
+ MockContextFactory.revertSetAsInitial() ;
+ }
+
@Test
public void testPoolAndConnectionCreation() throws Exception
{
JmsConnectionPool jmsConnectionPool = null;
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
assertEquals(0, jmsConnectionPool.getSessionsInPool());
//Open 3 concurrent sessions
JmsSession session1 = jmsConnectionPool.getSession();
@@ -76,7 +110,7 @@
assertEquals(0, jmsConnectionPool.getSessionsInPool());
assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
jmsConnectionPool.getSession();
assertEquals(1, jmsConnectionPool.getSessionsInPool());
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
@@ -88,12 +122,14 @@
@Test
public void testCreateSecondPool() throws Exception
{
- JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
- jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
+ JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
+ jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
//This should be the same pool
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
-
- JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory");
+
+ final Properties environment = new Properties() ;
+ environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, MockContextFactory.class.getName());
+ JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(environment, CONNECTION_FACTORY);
//This should be a different pool, so now we should have 2.
assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
@@ -108,7 +144,7 @@
@Test
public void testPoolAndSessionsWithAcknowledgeMode() throws Exception
{
- JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
assertEquals(0, jmsConnectionPool.getSessionsInPool());
JmsSession autoAckSession1 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
@@ -176,7 +212,7 @@
assertEquals(0, jmsConnectionPool.getSessionsInPool());
assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
jmsConnectionPool.getSession();
assertEquals(1, jmsConnectionPool.getSessionsInPool());
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
@@ -184,17 +220,121 @@
jmsConnectionPool.removeSessionPool();
}
- public Properties getEnvironment()
+ public static junit.framework.Test suite()
{
- Properties environment = new Properties();
- environment.setProperty(Context.PROVIDER_URL, Environment.JBOSS_PROVIDER_URL);
- environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Environment.JBOSS_INITIAL_CONTEXT_FACTORY);
- environment.setProperty(Context.URL_PKG_PREFIXES, Environment.JBOSS_URL_PKG_PREFIX);
- return environment;
+ return new JUnit4TestAdapter(JmsConnectionPoolingIntegrationTest.class);
}
- public static junit.framework.Test suite()
+ private static final class MockQueueConnectionFactory extends QueueConnectionFactoryImpl
{
- return new JUnit4TestAdapter(JmsConnectionPoolingIntegrationTest.class);
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return (QueueConnection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueConnection.class},
+ new MockQueueExceptionHandlerInvocationHandler(super.createQueueConnection())) ;
+ }
}
+
+ private static final class MockQueueExceptionHandlerInvocationHandler implements InvocationHandler
+ {
+ private final QueueConnection queueConnection ;
+ private ExceptionListener exceptionListener ;
+
+ MockQueueExceptionHandlerInvocationHandler(final QueueConnection queueConnection)
+ {
+ this.queueConnection = queueConnection ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("setExceptionListener".equals(methodName))
+ {
+ exceptionListener = (ExceptionListener)args[0] ;
+ return null ;
+ }
+ else if ("getExceptionListener".equals(methodName))
+ {
+ return exceptionListener ;
+ }
+ else
+ {
+ final Object response = method.invoke(queueConnection, args) ;
+// if (response instanceof QueueSession)
+// {
+// final QueueSession queueSession = (QueueSession)response ;
+// return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+// new MockQueueSessionInvocationHandler(queueSession)) ;
+// }
+// else
+// {
+ return response ;
+// }
+ }
+ }
+ }
+
+ private static final class MockTransactionStrategy extends TransactionStrategy
+ {
+ private final Object tx = new Object() ;
+ public boolean active ;
+
+ @Override
+ public void begin()
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public void enlistResource(final XAResource resource)
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public Object getTransaction()
+ throws TransactionStrategyException
+ {
+ return tx;
+ }
+
+ @Override
+ public boolean isActive()
+ throws TransactionStrategyException
+ {
+ return active;
+ }
+
+ @Override
+ public void registerSynchronization(final Synchronization sync)
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public void resume(final Object tx)
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public void rollbackOnly()
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public Object suspend()
+ throws TransactionStrategyException
+ {
+ return null;
+ }
+
+ @Override
+ public void terminate()
+ throws TransactionStrategyException
+ {
+ }
+ }
}
More information about the jboss-svn-commits
mailing list