[jboss-svn-commits] JBL Code SVN: r25743 - labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Mar 19 11:34:15 EDT 2009
Author: kevin.conner at jboss.com
Date: 2009-03-19 11:34:15 -0400 (Thu, 19 Mar 2009)
New Revision: 25743
Modified:
labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
Log:
Handle suspect sessions: JBESB-2485
Modified: labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-03-19 15:28:58 UTC (rev 25742)
+++ labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-03-19 15:34:15 UTC (rev 25743)
@@ -161,15 +161,17 @@
private synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
throws JMSException
{
+ final long currentID = id ;
+ final Connection currentConnection = jmsConnection ;
final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
public JmsSession call()
throws JMSException
{
final JmsSession session ;
if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)currentConnection).createXASession(), currentID);
} else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+ session = new JmsSession(currentConnection.createSession(transacted, acknowledgeMode), currentID);
}
return session ;
}
@@ -217,8 +219,9 @@
catch (final JMSException jmse)
{
/*
- * JBoss Messaging may drop the connection from the server side.
- * We check for IllegalStateException as this appears to be the indicator
+ * JBoss Messaging can drop the connection from the server side
+ * without calling back on the exception listener. We check for
+ * IllegalStateException as this appears to be the indicator
* exception used by JBoss Messaging when the connection has disappeared.
*/
Throwable cause = jmse ;
@@ -337,23 +340,36 @@
*/
synchronized void handleCloseSession(final JmsSession session)
{
- if (session.getId() == id)
+ if (session.isSuspect())
{
- final int mode ;
- try {
- mode = session.getAcknowledgeMode() ;
- } catch (final JMSException jmse) {
- logger.warn("JMSException while calling getAcknowledgeMode") ;
- logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
- return ;
+ logger.debug("Session is suspect, dropping") ;
+ handleReleaseSession(session) ;
+ }
+ else
+ {
+ if (session.getId() != id)
+ {
+ logger.debug("Session is from a previous incarnation, dropping") ;
}
-
- final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
- if (sessions != null) {
- sessions.add(session) ;
+ else
+ {
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.add(session) ;
+ }
}
+ session.releaseResources() ;
+ releaseInUseSession(session) ;
}
- handleReleaseSession(session) ;
}
/**
@@ -362,7 +378,21 @@
*/
synchronized void handleReleaseSession(final JmsSession session)
{
- session.releaseResources() ;
+ session.releaseResources();
+ try
+ {
+ session.close() ;
+ }
+ catch (final Throwable th) {} // ignore
+ releaseInUseSession(session) ;
+ }
+
+ /**
+ * Release a session from the in use mapping.
+ * @param session The session to release.
+ */
+ private void releaseInUseSession(final JmsSession session)
+ {
final int mode ;
try {
mode = session.getAcknowledgeMode() ;
Modified: labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2009-03-19 15:28:58 UTC (rev 25742)
+++ labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2009-03-19 15:34:15 UTC (rev 25743)
@@ -60,6 +60,10 @@
* The session acknowledge mode.
*/
private final int acknowledgeMode ;
+ /**
+ * Flag indicating whether this session is suspect or not.
+ */
+ private boolean suspect ;
/**
* The set of active queue browsers.
@@ -106,7 +110,9 @@
public void commit() throws JMSException
{
+ setSuspect(true) ;
session.commit();
+ setSuspect(false) ;
}
public QueueBrowser createBrowser(Queue arg0, String arg1)
@@ -252,7 +258,9 @@
public void rollback() throws JMSException
{
+ setSuspect(true) ;
session.rollback();
+ setSuspect(false) ;
}
public void run()
@@ -356,11 +364,6 @@
}
messageProducerSet = null ;
}
- try
- {
- recover() ;
- }
- catch (final JMSException jmse) {} // ignore
}
protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
@@ -397,4 +400,14 @@
throws JMSException
{
}
+
+ protected void setSuspect(final boolean suspect)
+ {
+ this.suspect = suspect ;
+ }
+
+ public boolean isSuspect()
+ {
+ return suspect ;
+ }
}
Modified: labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2009-03-19 15:28:58 UTC (rev 25742)
+++ labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2009-03-19 15:34:15 UTC (rev 25743)
@@ -32,6 +32,7 @@
import javax.jms.QueueBrowser;
import javax.jms.TopicSubscriber;
import javax.jms.XASession;
+import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.xa.XAResource;
@@ -162,11 +163,12 @@
if (!associated)
{
cleanupAction = Cleanup.none ;
- final XAResource resource = session.getXAResource() ;
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
try
{
transactionStrategy.registerSynchronization(this) ;
+ setSuspect(true) ;
+ final XAResource resource = session.getXAResource() ;
transactionStrategy.enlistResource(resource) ;
}
catch (final TransactionStrategyException tse)
@@ -186,6 +188,7 @@
throw ex ;
}
+ setSuspect(false) ;
associated = true ;
}
}
@@ -200,8 +203,12 @@
switch (cleanupAction)
{
case close:
- pool.handleCloseSession(this) ;
- break ;
+ if (result == Status.STATUS_COMMITTED)
+ {
+ pool.handleCloseSession(this) ;
+ break ;
+ }
+ // fall through
case release:
pool.handleReleaseSession(this) ;
break ;
More information about the jboss-svn-commits
mailing list