[jboss-cvs] JBossAS SVN: r67638 - trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 29 19:57:14 EST 2007
Author: bdecoste
Date: 2007-11-29 19:57:13 -0500 (Thu, 29 Nov 2007)
New Revision: 67638
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
Log:
[aspatch-240] merge tibco ems ha session handling fixes
Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-11-30 00:57:13 UTC (rev 67638)
@@ -120,6 +120,46 @@
private boolean redeliverUnspecified = true;
+ private int transactionTimeout;
+
+ private Boolean isSameRMOverrideValue;
+
+ private boolean forceClearOnShutdown = false;
+
+ private long forceClearOnShutdownInterval = 1000;
+
+ private int forceClearAttempts = 0;
+
+ public void setForceClearOnShutdown(boolean forceClear)
+ {
+ this.forceClearOnShutdown = forceClear;
+ }
+
+ public boolean isForceClearOnShutdown()
+ {
+ return this.forceClearOnShutdown;
+ }
+
+ public long getForceClearOnShutdownInterval()
+ {
+ return this.forceClearOnShutdownInterval;
+ }
+
+ public void setForceClearOnShutdownInterval(long forceClearOnShutdownInterval)
+ {
+ this.forceClearOnShutdownInterval = forceClearOnShutdownInterval;
+ }
+
+ public int getForceClearAttempts()
+ {
+ return forceClearAttempts;
+ }
+
+ public void setForceClearAttempts(int forceClearAttempts)
+ {
+ this.forceClearAttempts = forceClearAttempts;
+ }
+
/**
* @return the acknowledgeMode.
*/
@@ -670,4 +710,25 @@
{
this.redeliverUnspecified = redeliverUnspecified;
}
+
+ public int getTransactionTimeout()
+ {
+ return transactionTimeout;
+ }
+
+ public void setTransactionTimeout(int transactionTimeout)
+ {
+ this.transactionTimeout = transactionTimeout;
+ }
+
+ public Boolean getIsSameRMOverrideValue()
+ {
+ return isSameRMOverrideValue;
+ }
+
+ public void setIsSameRMOverrideValue(Boolean isSameRMOverrideValue)
+ {
+ this.isSameRMOverrideValue = isSameRMOverrideValue;
+ }
+
}
\ No newline at end of file
Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-11-30 00:57:13 UTC (rev 67638)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.resource.adapter.jms.inflow;
import javax.jms.Connection;
@@ -47,49 +47,50 @@
* A generic jms session pool.
*
* @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
-public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
+public class JmsServerSession implements ServerSession, MessageListener, Work,
+ WorkListener
{
/** The log */
private static final Logger log = Logger.getLogger(JmsServerSession.class);
-
+
/** The session pool */
JmsServerSessionPool pool;
-
+
/** The transacted flag */
boolean transacted;
-
+
/** The acknowledge mode */
int acknowledge;
-
+
/** The session */
Session session;
-
+
/** Any XA session */
XASession xaSession;
-
+
/** The endpoint */
MessageEndpoint endpoint;
-
+
/** Any DLQ handler */
DLQHandler dlqHandler;
-
+
TransactionDemarcationStrategy txnStrategy;
-
-
+
/**
* Create a new JmsServerSession
*
- * @param pool the server session pool
+ * @param pool
+ * the server session pool
*/
public JmsServerSession(JmsServerSessionPool pool)
{
this.pool = pool;
-
+
}
-
+
/**
* Setup the session
*/
@@ -99,35 +100,36 @@
JmsActivationSpec spec = activation.getActivationSpec();
dlqHandler = activation.getDLQHandler();
-
+
Connection connection = activation.getConnection();
// Create the session
- if (connection instanceof XAConnection && activation.isDeliveryTransacted())
+ if (connection instanceof XAConnection
+ && activation.isDeliveryTransacted())
{
xaSession = ((XAConnection) connection).createXASession();
session = xaSession.getSession();
- }
- else
- {
- transacted = spec.isSessionTransacted();
- acknowledge = spec.getAcknowledgeModeInt();
+ } else
+ {
+ transacted = spec.isSessionTransacted();
+ acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
-
+
// Get the endpoint
- MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+ MessageEndpointFactory endpointFactory = activation
+ .getMessageEndpointFactory();
XAResource xaResource = null;
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
-
+
endpoint = endpointFactory.createEndpoint(xaResource);
-
+
// Set the message listener
session.setMessageListener(this);
}
-
+
/**
* Stop the session
*/
@@ -137,8 +139,7 @@
{
if (endpoint != null)
endpoint.release();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing endpoint " + endpoint, t);
}
@@ -147,8 +148,7 @@
{
if (xaSession != null)
xaSession.close();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing xaSession " + xaSession, t);
}
@@ -157,31 +157,30 @@
{
if (session != null)
session.close();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing session " + session, t);
}
}
-
+
public void onMessage(Message message)
{
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
-
+
try
{
- if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
+ if (dlqHandler == null
+ || dlqHandler.handleRedeliveredMessage(message) == false)
{
- MessageListener listener = (MessageListener)endpoint;
+ MessageListener listener = (MessageListener) endpoint;
listener.onMessage(message);
}
- }
- finally
+ } finally
{
endpoint.afterDelivery();
-
+
if (dlqHandler != null)
dlqHandler.messageDelivered(message);
}
@@ -190,13 +189,12 @@
catch (Throwable t)
{
log.error("Unexpected error delivering message " + message, t);
-
- if(txnStrategy != null)
+
+ if (txnStrategy != null)
txnStrategy.error();
-
+
}
-
-
+
}
public Session getSession() throws JMSException
@@ -206,13 +204,12 @@
public void start() throws JMSException
{
- JmsActivation activation = pool.getActivation();
+ JmsActivation activation = pool.getActivation();
WorkManager workManager = activation.getWorkManager();
try
{
workManager.scheduleWork(this, 0, null, this);
- }
- catch (WorkException e)
+ } catch (WorkException e)
{
log.error("Unable to schedule work", e);
throw new JMSException("Unable to schedule work: " + e.toString());
@@ -221,46 +218,45 @@
public void run()
{
-
+
try
{
txnStrategy = createTransactionDemarcation();
-
- }catch(Throwable t)
+
+ } catch (Throwable t)
{
log.error("Error creating transaction demarcation. Cannot continue.");
return;
}
-
-
+
try
- {
+ {
session.run();
- }
- catch(Throwable t)
+ } catch (Throwable t)
{
if (txnStrategy != null)
txnStrategy.error();
-
- }finally
+
+ } finally
{
- if(txnStrategy != null)
+ if (txnStrategy != null)
txnStrategy.end();
txnStrategy = null;
}
-
+
}
-
+
private TransactionDemarcationStrategy createTransactionDemarcation()
{
return new DemarcationStrategyFactory().getStrategy();
-
+
}
+
public void release()
{
}
-
+
public void workAccepted(WorkEvent e)
{
}
@@ -275,154 +271,170 @@
pool.returnServerSession(this);
}
-
public void workStarted(WorkEvent e)
{
}
-
+
private class DemarcationStrategyFactory
{
-
+
TransactionDemarcationStrategy getStrategy()
{
TransactionDemarcationStrategy current = null;
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
final JmsActivation activation = pool.getActivation();
-
- if(activation.isDeliveryTransacted() && xaSession != null)
+
+ if (activation.isDeliveryTransacted() && xaSession != null)
{
try
{
current = new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.error(this + " error creating transaction demarcation ", t);
- }
-
- }else
+ }
+
+ } else
{
-
- return new LocalDemarcationStrategy();
-
+
+ return new LocalDemarcationStrategy();
+
}
-
+
return current;
}
-
+
}
+
private interface TransactionDemarcationStrategy
{
void error();
+
void end();
-
+
}
-
- private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+
+ private class LocalDemarcationStrategy implements
+ TransactionDemarcationStrategy
{
public void end()
{
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
+ if (session != null)
{
try
{
session.commit();
- }
- catch (JMSException e)
+ } catch (JMSException e)
{
log.error("Failed to commit session transaction", e);
}
}
}
}
-
+
public void error()
{
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
-
+ if (session != null)
+
try
{
/*
* Looks strange, but this basically means
*
- * If the underlying connection was non-XA and the transaction attribute is REQUIRED
- * we rollback. Also, if the underlying connection was non-XA and the transaction
- * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled
- * we rollback to force redelivery.
+ * If the underlying connection was non-XA and the transaction
+ * attribute is REQUIRED we rollback. Also, if the underlying
+ * connection was non-XA and the transaction attribute is
+ * NOT_SUPPORT and the non standard redelivery behavior is
+ * enabled we rollback to force redelivery.
*
*/
- if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+ if (pool.getActivation().isDeliveryTransacted()
+ || spec.getRedeliverUnspecified())
{
- session.rollback();
+ session.rollback();
}
-
- }
- catch (JMSException e)
+
+ } catch (JMSException e)
{
log.error("Failed to rollback session transaction", e);
}
-
+
}
}
-
+
}
- private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+ private class XATransactionDemarcationStrategy implements
+ TransactionDemarcationStrategy
{
-
- boolean trace = log.isTraceEnabled();
-
+
+ boolean trace = log.isTraceEnabled();
+
Transaction trans = null;
+
TransactionManager tm = pool.getActivation().getTransactionManager();;
-
+
public XATransactionDemarcationStrategy() throws Throwable
{
-
- tm.begin();
+ final int timeout = pool.getActivation().getActivationSpec()
+ .getTransactionTimeout();
- try
- {
- trans = tm.getTransaction();
+ if (timeout > 0)
+ {
+ log.trace("Setting transactionTimeout for JMSSessionPool to "
+ + timeout);
+ tm.setTransactionTimeout(timeout);
- if (trace)
- log.trace(JmsServerSession.this + " using tx=" + trans);
+ }
- if (xaSession != null)
- {
- XAResource res = xaSession.getXAResource();
+ tm.begin();
- if (!trans.enlistResource(res))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
- }
- }
- catch (Throwable t)
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(JmsServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
{
- try
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
{
- tm.rollback();
+ throw new JMSException("could not enlist resource");
}
- catch (Throwable ignored)
- {
- log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
- }
- throw t;
+ if (trace)
+ log.trace(JmsServerSession.this + " XAResource '" + res
+ + "' enlisted.");
}
+ } catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ } catch (Throwable ignored)
+ {
+ log.trace(JmsServerSession.this
+ + " ignored error rolling back after failed enlist",
+ ignored);
+ }
+ throw t;
+ }
- }
-
-
+ }
+
public void error()
{
// Mark for tollback TX via TM
@@ -430,38 +442,45 @@
{
if (trace)
- log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " using TM to mark TX for rollback tx=" + trans);
trans.setRollbackOnly();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
- log.error(JmsServerSession.this + " failed to set rollback only", t);
+ log
+ .error(
+ JmsServerSession.this + " failed to set rollback only",
+ t);
}
}
-
+
public void end()
{
try
{
- // Use the TM to commit the Tx (assert the correct association)
+ // Use the TM to commit the Tx (assert the correct association)
Transaction currentTx = tm.getTransaction();
if (trans.equals(currentTx) == false)
- throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+ throw new IllegalStateException(
+ "Wrong tx association: expected " + trans + " was "
+ + currentTx);
// Marked rollback
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if (trace)
- log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " rolling back JMS transaction tx=" + trans);
// actually roll it back
tm.rollback();
// NO XASession? then manually rollback.
// This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.rollback();
}
@@ -474,30 +493,31 @@
// a) everything goes well
// b) app. exception was thrown
if (trace)
- log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " commiting the JMS transaction tx=" + trans);
tm.commit();
- // NO XASession? then manually commit. This is not so good but
+ // NO XASession? then manually commit. This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.commit();
}
-
- }else
+
+ } else
{
tm.suspend();
-
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.rollback();
}
-
-
+
}
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.error(JmsServerSession.this + " failed to commit/rollback", t);
}
Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-11-30 00:57:04 UTC (rev 67637)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-11-30 00:57:13 UTC (rev 67638)
@@ -220,15 +220,41 @@
sessionCount -= serverSessions.size();
serverSessions.clear();
- // Wait for inuse sessions
- while (sessionCount > 0)
- {
- try
+ if (activation.getActivationSpec().isForceClearOnShutdown())
+ {
+ int attempts = 0;
+ int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
+ long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
+
+ log.info("Force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
+
+ while((sessionCount > 0) && (attempts < forceClearAttempts))
{
- serverSessions.wait();
+ try
+ {
+ serverSessions.wait(forceClearInterval);
+ log.trace("Clear attempt " + attempts);
+ ++attempts;
+
+ }catch(InterruptedException ignore)
+ {
+
+ }
+
}
- catch (InterruptedException ignore)
+ }
+ else
+ {
+ // Wait for inuse sessions
+ while (sessionCount > 0)
{
+ try
+ {
+ serverSessions.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
}
}
More information about the jboss-cvs-commits
mailing list