Bill, JBossAS is an open source project.
What is this patch?
1) Why is the first time I see this change in a commit message?
There's absolutely no discussion about this change anywhere.
This is where you discuss changes to JCA:
BEFORE YOU COMMIT.
2) Where is the JBAS-XXXX for this so it appears in the release notes?
Why does the commit message relate to a patch to an old branch then
modify a development branch?
3) These changes are not documentated anywhere, including the
WIKI page for the activation config properties of the jms resource
adapter:
4) Where are the tests for this new behaviour?
Not tested == doesn't work
Not tested == even if it appeared to work, it will almost certainly
unwittingly get broken by somebody in the future
5) The main purpose of this change appears to be to screw up the
whitespace. Making it hard to understand what has been changed. :-)
Please fix the formatter in your ide. Or if you have to
reformat, do it in a (null) seperate commit so the real patch
is easy to understand.
The change seems to be to introduce new activation config properties.
I'll deal with each individually.
This feedback should have been sought BEFORE COMMITTING.
a) transactionTimeout
This looks like a complete misunderstanding by Weston.
This should be coming from the method attributes in jboss.xml
b) isSameRMOverrideValue
This isn't used anywhere except the activation spec?????
[ejort@warjort jms]$ grep -ri isSameRMOverride * | grep -v svn
inflow/JmsActivationSpec.java: private Boolean isSameRMOverrideValue;
inflow/JmsActivationSpec.java: public Boolean
getIsSameRMOverrideValue()
inflow/JmsActivationSpec.java: return isSameRMOverrideValue;
inflow/JmsActivationSpec.java: public void
setIsSameRMOverrideValue(Boolean isSameRMOverrideValue)
inflow/JmsActivationSpec.java: this.isSameRMOverrideValue =
isSameRMOverrideValue;
c) forceClearOnShutdown, et. al.
The idea looks good, but the implementation is rubbish.
It shows a fundamental misunderstooding on how the pool works.
The notification happens once for each session that ends
(and hence so does your loop).
In practice with the default config, it will leave
the loop after the first session ends (almost immediately) when in the
default config there are still 14 other sessions to go.
A 1 second default wait is also unrealistic. Many people have MDBs
that take longer than this to process something.
In fact, this should be the default behaviour (but with different
configuration). Nothing should wait forever! :-)
The defaults should be specified in conf/standardjboss.xml
and the ejb3 equivalent aop interceptors file such that
users can easily change them.
In future (and some of it for this change):
* Please discuss before committing
* Raise JBAS JIRA issues so the release notes are up-to-date
* At least update the WIKI to describe the new features
* Write some tests
* Don't play with whitespace in real commits
On Thu, 2007-11-29 at 19:57 -0500, jboss-cvs-commits(a)lists.jboss.org
wrote:
Author: bdecoste
Date: 2007-11-29 19:57:13 -0500 (Thu, 29 Nov 2007)
New Revision: 67638
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
Log:
[aspatch-240] merge tibco ems ha session handling fixes
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
---
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-11-30
00:57:04 UTC (rev 67637)
+++
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-11-30
00:57:13 UTC (rev 67638)
@@ -120,6 +120,46 @@
private boolean redeliverUnspecified = true;
+ private int transactionTimeout;
+
+ private Boolean isSameRMOverrideValue;
+
+ private boolean forceClearOnShutdown = false;
+
+ private long forceClearOnShutdownInterval = 1000;
+
+ private int forceClearAttempts = 0;
+
+ public void setForceClearOnShutdown(boolean forceClear)
+ {
+ this.forceClearOnShutdown = forceClear;
+ }
+
+ public boolean isForceClearOnShutdown()
+ {
+ return this.forceClearOnShutdown;
+ }
+
+ public long getForceClearOnShutdownInterval()
+ {
+ return this.forceClearOnShutdownInterval;
+ }
+
+ public void setForceClearOnShutdownInterval(long forceClearOnShutdownInterval)
+ {
+ this.forceClearOnShutdownInterval = forceClearOnShutdownInterval;
+ }
+
+ public int getForceClearAttempts()
+ {
+ return forceClearAttempts;
+ }
+
+ public void setForceClearAttempts(int forceClearAttempts)
+ {
+ this.forceClearAttempts = forceClearAttempts;
+ }
+
/**
* @return the acknowledgeMode.
*/
@@ -670,4 +710,25 @@
{
this.redeliverUnspecified = redeliverUnspecified;
}
+
+ public int getTransactionTimeout()
+ {
+ return transactionTimeout;
+ }
+
+ public void setTransactionTimeout(int transactionTimeout)
+ {
+ this.transactionTimeout = transactionTimeout;
+ }
+
+ public Boolean getIsSameRMOverrideValue()
+ {
+ return isSameRMOverrideValue;
+ }
+
+ public void setIsSameRMOverrideValue(Boolean isSameRMOverrideValue)
+ {
+ this.isSameRMOverrideValue = isSameRMOverrideValue;
+ }
+
}
\ No newline at end of file
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
---
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-11-30
00:57:04 UTC (rev 67637)
+++
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-11-30
00:57:13 UTC (rev 67638)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
package org.jboss.resource.adapter.jms.inflow;
import javax.jms.Connection;
@@ -47,49 +47,50 @@
* A generic jms session pool.
*
* @author <a href="adrian(a)jboss.com">Adrian Brock</a>
- * @author <a href="mailto:weston.price@jboss.com>Weston Price</a>
+ * @author <a href="mailto:weston.price@jboss.com>Weston Price</a>
* @version $Revision$
*/
-public class JmsServerSession implements ServerSession, MessageListener, Work,
WorkListener
+public class JmsServerSession implements ServerSession, MessageListener, Work,
+ WorkListener
{
/** The log */
private static final Logger log = Logger.getLogger(JmsServerSession.class);
-
+
/** The session pool */
JmsServerSessionPool pool;
-
+
/** The transacted flag */
boolean transacted;
-
+
/** The acknowledge mode */
int acknowledge;
-
+
/** The session */
Session session;
-
+
/** Any XA session */
XASession xaSession;
-
+
/** The endpoint */
MessageEndpoint endpoint;
-
+
/** Any DLQ handler */
DLQHandler dlqHandler;
-
+
TransactionDemarcationStrategy txnStrategy;
-
-
+
/**
* Create a new JmsServerSession
*
- * @param pool the server session pool
+ * @param pool
+ * the server session pool
*/
public JmsServerSession(JmsServerSessionPool pool)
{
this.pool = pool;
-
+
}
-
+
/**
* Setup the session
*/
@@ -99,35 +100,36 @@
JmsActivationSpec spec = activation.getActivationSpec();
dlqHandler = activation.getDLQHandler();
-
+
Connection connection = activation.getConnection();
// Create the session
- if (connection instanceof XAConnection &&
activation.isDeliveryTransacted())
+ if (connection instanceof XAConnection
+ && activation.isDeliveryTransacted())
{
xaSession = ((XAConnection) connection).createXASession();
session = xaSession.getSession();
- }
- else
- {
- transacted = spec.isSessionTransacted();
- acknowledge = spec.getAcknowledgeModeInt();
+ } else
+ {
+ transacted = spec.isSessionTransacted();
+ acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
-
+
// Get the endpoint
- MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+ MessageEndpointFactory endpointFactory = activation
+ .getMessageEndpointFactory();
XAResource xaResource = null;
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
-
+
endpoint = endpointFactory.createEndpoint(xaResource);
-
+
// Set the message listener
session.setMessageListener(this);
}
-
+
/**
* Stop the session
*/
@@ -137,8 +139,7 @@
{
if (endpoint != null)
endpoint.release();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing endpoint " + endpoint, t);
}
@@ -147,8 +148,7 @@
{
if (xaSession != null)
xaSession.close();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing xaSession " + xaSession, t);
}
@@ -157,31 +157,30 @@
{
if (session != null)
session.close();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.debug("Error releasing session " + session, t);
}
}
-
+
public void onMessage(Message message)
{
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
-
+
try
{
- if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) ==
false)
+ if (dlqHandler == null
+ || dlqHandler.handleRedeliveredMessage(message) == false)
{
- MessageListener listener = (MessageListener)endpoint;
+ MessageListener listener = (MessageListener) endpoint;
listener.onMessage(message);
}
- }
- finally
+ } finally
{
endpoint.afterDelivery();
-
+
if (dlqHandler != null)
dlqHandler.messageDelivered(message);
}
@@ -190,13 +189,12 @@
catch (Throwable t)
{
log.error("Unexpected error delivering message " + message, t);
-
- if(txnStrategy != null)
+
+ if (txnStrategy != null)
txnStrategy.error();
-
+
}
-
-
+
}
public Session getSession() throws JMSException
@@ -206,13 +204,12 @@
public void start() throws JMSException
{
- JmsActivation activation = pool.getActivation();
+ JmsActivation activation = pool.getActivation();
WorkManager workManager = activation.getWorkManager();
try
{
workManager.scheduleWork(this, 0, null, this);
- }
- catch (WorkException e)
+ } catch (WorkException e)
{
log.error("Unable to schedule work", e);
throw new JMSException("Unable to schedule work: " + e.toString());
@@ -221,46 +218,45 @@
public void run()
{
-
+
try
{
txnStrategy = createTransactionDemarcation();
-
- }catch(Throwable t)
+
+ } catch (Throwable t)
{
log.error("Error creating transaction demarcation. Cannot
continue.");
return;
}
-
-
+
try
- {
+ {
session.run();
- }
- catch(Throwable t)
+ } catch (Throwable t)
{
if (txnStrategy != null)
txnStrategy.error();
-
- }finally
+
+ } finally
{
- if(txnStrategy != null)
+ if (txnStrategy != null)
txnStrategy.end();
txnStrategy = null;
}
-
+
}
-
+
private TransactionDemarcationStrategy createTransactionDemarcation()
{
return new DemarcationStrategyFactory().getStrategy();
-
+
}
+
public void release()
{
}
-
+
public void workAccepted(WorkEvent e)
{
}
@@ -275,154 +271,170 @@
pool.returnServerSession(this);
}
-
public void workStarted(WorkEvent e)
{
}
-
+
private class DemarcationStrategyFactory
{
-
+
TransactionDemarcationStrategy getStrategy()
{
TransactionDemarcationStrategy current = null;
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
final JmsActivation activation = pool.getActivation();
-
- if(activation.isDeliveryTransacted() && xaSession != null)
+
+ if (activation.isDeliveryTransacted() && xaSession != null)
{
try
{
current = new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.error(this + " error creating transaction demarcation ",
t);
- }
-
- }else
+ }
+
+ } else
{
-
- return new LocalDemarcationStrategy();
-
+
+ return new LocalDemarcationStrategy();
+
}
-
+
return current;
}
-
+
}
+
private interface TransactionDemarcationStrategy
{
void error();
+
void end();
-
+
}
-
- private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+
+ private class LocalDemarcationStrategy implements
+ TransactionDemarcationStrategy
{
public void end()
{
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
+ if (session != null)
{
try
{
session.commit();
- }
- catch (JMSException e)
+ } catch (JMSException e)
{
log.error("Failed to commit session transaction", e);
}
}
}
}
-
+
public void error()
{
- final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation()
+ .getActivationSpec();
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
-
+ if (session != null)
+
try
{
/*
* Looks strange, but this basically means
*
- * If the underlying connection was non-XA and the transaction
attribute is REQUIRED
- * we rollback. Also, if the underlying connection was non-XA and the
transaction
- * attribute is NOT_SUPPORT and the non standard redelivery behavior
is enabled
- * we rollback to force redelivery.
+ * If the underlying connection was non-XA and the transaction
+ * attribute is REQUIRED we rollback. Also, if the underlying
+ * connection was non-XA and the transaction attribute is
+ * NOT_SUPPORT and the non standard redelivery behavior is
+ * enabled we rollback to force redelivery.
*
*/
- if(pool.getActivation().isDeliveryTransacted() ||
spec.getRedeliverUnspecified())
+ if (pool.getActivation().isDeliveryTransacted()
+ || spec.getRedeliverUnspecified())
{
- session.rollback();
+ session.rollback();
}
-
- }
- catch (JMSException e)
+
+ } catch (JMSException e)
{
log.error("Failed to rollback session transaction", e);
}
-
+
}
}
-
+
}
- private class XATransactionDemarcationStrategy implements
TransactionDemarcationStrategy
+ private class XATransactionDemarcationStrategy implements
+ TransactionDemarcationStrategy
{
-
- boolean trace = log.isTraceEnabled();
-
+
+ boolean trace = log.isTraceEnabled();
+
Transaction trans = null;
+
TransactionManager tm = pool.getActivation().getTransactionManager();;
-
+
public XATransactionDemarcationStrategy() throws Throwable
{
-
- tm.begin();
+ final int timeout = pool.getActivation().getActivationSpec()
+ .getTransactionTimeout();
- try
- {
- trans = tm.getTransaction();
+ if (timeout > 0)
+ {
+ log.trace("Setting transactionTimeout for JMSSessionPool to "
+ + timeout);
+ tm.setTransactionTimeout(timeout);
- if (trace)
- log.trace(JmsServerSession.this + " using tx=" + trans);
+ }
- if (xaSession != null)
- {
- XAResource res = xaSession.getXAResource();
+ tm.begin();
- if (!trans.enlistResource(res))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- log.trace(JmsServerSession.this + " XAResource '" +
res + "' enlisted.");
- }
- }
- catch (Throwable t)
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(JmsServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
{
- try
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
{
- tm.rollback();
+ throw new JMSException("could not enlist resource");
}
- catch (Throwable ignored)
- {
- log.trace(JmsServerSession.this + " ignored error rolling back
after failed enlist", ignored);
- }
- throw t;
+ if (trace)
+ log.trace(JmsServerSession.this + " XAResource '" + res
+ + "' enlisted.");
}
+ } catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ } catch (Throwable ignored)
+ {
+ log.trace(JmsServerSession.this
+ + " ignored error rolling back after failed enlist",
+ ignored);
+ }
+ throw t;
+ }
- }
-
-
+ }
+
public void error()
{
// Mark for tollback TX via TM
@@ -430,38 +442,45 @@
{
if (trace)
- log.trace(JmsServerSession.this + " using TM to mark TX for rollback
tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " using TM to mark TX for rollback tx=" + trans);
trans.setRollbackOnly();
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
- log.error(JmsServerSession.this + " failed to set rollback only",
t);
+ log
+ .error(
+ JmsServerSession.this + " failed to set rollback
only",
+ t);
}
}
-
+
public void end()
{
try
{
- // Use the TM to commit the Tx (assert the correct association)
+ // Use the TM to commit the Tx (assert the correct association)
Transaction currentTx = tm.getTransaction();
if (trans.equals(currentTx) == false)
- throw new IllegalStateException("Wrong tx association: expected
" + trans + " was " + currentTx);
+ throw new IllegalStateException(
+ "Wrong tx association: expected " + trans + " was
"
+ + currentTx);
// Marked rollback
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if (trace)
- log.trace(JmsServerSession.this + " rolling back JMS transaction
tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " rolling back JMS transaction tx=" + trans);
// actually roll it back
tm.rollback();
// NO XASession? then manually rollback.
// This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null &&
pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.rollback();
}
@@ -474,30 +493,31 @@
// a) everything goes well
// b) app. exception was thrown
if (trace)
- log.trace(JmsServerSession.this + " commiting the JMS transaction
tx=" + trans);
+ log.trace(JmsServerSession.this
+ + " commiting the JMS transaction tx=" + trans);
tm.commit();
- // NO XASession? then manually commit. This is not so good but
+ // NO XASession? then manually commit. This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null &&
pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.commit();
}
-
- }else
+
+ } else
{
tm.suspend();
-
- if (xaSession == null &&
pool.getActivation().isDeliveryTransacted())
+
+ if (xaSession == null
+ && pool.getActivation().isDeliveryTransacted())
{
session.rollback();
}
-
-
+
}
- }
- catch (Throwable t)
+ } catch (Throwable t)
{
log.error(JmsServerSession.this + " failed to commit/rollback",
t);
}
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
---
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-11-30
00:57:04 UTC (rev 67637)
+++
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-11-30
00:57:13 UTC (rev 67638)
@@ -220,15 +220,41 @@
sessionCount -= serverSessions.size();
serverSessions.clear();
- // Wait for inuse sessions
- while (sessionCount > 0)
- {
- try
+ if (activation.getActivationSpec().isForceClearOnShutdown())
+ {
+ int attempts = 0;
+ int forceClearAttempts =
activation.getActivationSpec().getForceClearAttempts();
+ long forceClearInterval =
activation.getActivationSpec().getForceClearOnShutdownInterval();
+
+ log.info("Force clear behavior in effect. Waiting for " +
forceClearInterval + " milliseconds for " + forceClearAttempts + "
attempts.");
+
+ while((sessionCount > 0) && (attempts < forceClearAttempts))
{
- serverSessions.wait();
+ try
+ {
+ serverSessions.wait(forceClearInterval);
+ log.trace("Clear attempt " + attempts);
+ ++attempts;
+
+ }catch(InterruptedException ignore)
+ {
+
+ }
+
}
- catch (InterruptedException ignore)
+ }
+ else
+ {
+ // Wait for inuse sessions
+ while (sessionCount > 0)
{
+ try
+ {
+ serverSessions.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
}
}
_______________________________________________
jboss-cvs-commits mailing list
jboss-cvs-commits(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/jboss-cvs-commits --
xxxxxxxxxxxxxxxxxxxxxxxxxxxx
Adrian Brock
Chief Scientist
JBoss, a division of Red Hat
xxxxxxxxxxxxxxxxxxxxxxxxxxxx