[jboss-cvs] JBossAS SVN: r60397 - branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 7 14:25:23 EST 2007
Author: weston.price at jboss.com
Date: 2007-02-07 14:25:22 -0500 (Wed, 07 Feb 2007)
New Revision: 60397
Modified:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
Log:
[JBAS-1434][JBAS-3321] Removed reconnect attempts optimization. Added setTransactionTimeout
to activation spec and JmsServerSessionPool.
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-02-07 19:25:22 UTC (rev 60397)
@@ -229,9 +229,8 @@
public void handleFailure(Throwable failure)
{
log.warn("Failure in jms activation " + spec, failure);
- int reconnectCount = 0;
- while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
+ while (deliveryActive.get())
{
teardown();
try
@@ -256,7 +255,6 @@
log.error("Unable to reconnect " + spec, t);
}
- ++reconnectCount;
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-02-07 19:25:22 UTC (rev 60397)
@@ -115,12 +115,11 @@
/** The DLQ max resent */
private int dLQMaxResent = 5;
- //Default to 5 attempts
- private int reconnectAttempts = 5;
-
//Used to specify whether or not we should attempt to redeliver a message in an unspecified txn context
private boolean redeliverUnspecified = true;
+ private int transactionTimeout;
+
/**
* @return the acknowledgeMode.
*/
@@ -652,23 +651,26 @@
return buffer.toString();
}
- public int getReconnectAttempts()
+ public boolean getRedeliverUnspecified()
{
- return reconnectAttempts;
+ return redeliverUnspecified;
}
- public void setReconnectAttempts(int reconnectAttempts)
+ public void setRedeliverUnspecified(boolean redeliverUnspecified)
{
- this.reconnectAttempts = reconnectAttempts;
+ this.redeliverUnspecified = redeliverUnspecified;
}
- public boolean getRedeliverUnspecified()
+ public int getTransactionTimeout()
{
- return redeliverUnspecified;
+ return transactionTimeout;
}
- public void setRedeliverUnspecified(boolean redeliverUnspecified)
+ public void setTransactionTimeout(int transactionTimeout)
{
- this.redeliverUnspecified = redeliverUnspecified;
+ this.transactionTimeout = transactionTimeout;
}
+
+
+
}
\ No newline at end of file
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-02-07 18:31:53 UTC (rev 60396)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-02-07 19:25:22 UTC (rev 60397)
@@ -55,31 +55,30 @@
{
/** The log */
private static final Logger log = Logger.getLogger(JmsServerSession.class);
-
+
/** The session pool */
JmsServerSessionPool pool;
-
+
/** The transacted flag */
boolean transacted;
-
+
/** The acknowledge mode */
int acknowledge;
-
+
/** The session */
Session session;
-
+
/** Any XA session */
XASession xaSession;
-
+
/** The endpoint */
MessageEndpoint endpoint;
-
+
/** Any DLQ handler */
DLQHandler dlqHandler;
-
+
TransactionDemarcationStrategy txnStrategy;
-
-
+
/**
* Create a new JmsServerSession
*
@@ -88,9 +87,9 @@
public JmsServerSession(JmsServerSessionPool pool)
{
this.pool = pool;
-
+
}
-
+
/**
* Setup the session
*/
@@ -100,7 +99,7 @@
JmsActivationSpec spec = activation.getActivationSpec();
dlqHandler = activation.getDLQHandler();
-
+
Connection connection = activation.getConnection();
// Create the session
@@ -110,25 +109,25 @@
session = xaSession.getSession();
}
else
- {
- transacted = spec.isSessionTransacted();
- acknowledge = spec.getAcknowledgeModeInt();
+ {
+ transacted = spec.isSessionTransacted();
+ acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
-
+
// Get the endpoint
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
XAResource xaResource = null;
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
-
+
endpoint = endpointFactory.createEndpoint(xaResource);
-
+
// Set the message listener
session.setMessageListener(this);
}
-
+
/**
* Stop the session
*/
@@ -164,25 +163,25 @@
log.debug("Error releasing session " + session, t);
}
}
-
+
public void onMessage(Message message)
{
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
-
+
try
{
if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
{
- MessageListener listener = (MessageListener)endpoint;
+ MessageListener listener = (MessageListener) endpoint;
listener.onMessage(message);
}
}
finally
{
endpoint.afterDelivery();
-
+
if (dlqHandler != null)
dlqHandler.messageDelivered(message);
}
@@ -191,13 +190,12 @@
catch (Throwable t)
{
log.error("Unexpected error delivering message " + message, t);
-
- if(txnStrategy != null)
+
+ if (txnStrategy != null)
txnStrategy.error();
-
+
}
-
-
+
}
public Session getSession() throws JMSException
@@ -207,7 +205,7 @@
public void start() throws JMSException
{
- JmsActivation activation = pool.getActivation();
+ JmsActivation activation = pool.getActivation();
WorkManager workManager = activation.getWorkManager();
try
{
@@ -222,46 +220,48 @@
public void run()
{
-
+
try
{
txnStrategy = createTransactionDemarcation();
-
- }catch(Throwable t)
+
+ }
+ catch (Throwable t)
{
log.error("Error creating transaction demarcation. Cannot continue.");
return;
}
-
-
+
try
- {
+ {
session.run();
- }
- catch(Throwable t)
+ }
+ catch (Throwable t)
{
if (txnStrategy != null)
txnStrategy.error();
-
- }finally
+
+ }
+ finally
{
- if(txnStrategy != null)
+ if (txnStrategy != null)
txnStrategy.end();
txnStrategy = null;
}
-
+
}
-
+
private TransactionDemarcationStrategy createTransactionDemarcation()
{
return new DemarcationStrategyFactory().getStrategy();
-
+
}
+
public void release()
{
}
-
+
public void workAccepted(WorkEvent e)
{
}
@@ -276,58 +276,60 @@
pool.returnServerSession(this);
}
-
public void workStarted(WorkEvent e)
{
}
-
+
private class DemarcationStrategyFactory
{
-
+
TransactionDemarcationStrategy getStrategy()
{
TransactionDemarcationStrategy current = null;
final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
final JmsActivation activation = pool.getActivation();
-
- if(activation.isDeliveryTransacted() && xaSession != null)
+
+ if (activation.isDeliveryTransacted() && xaSession != null)
{
try
{
current = new XATransactionDemarcationStrategy();
- }
+ }
catch (Throwable t)
{
log.error(this + " error creating transaction demarcation ", t);
- }
-
- }else
+ }
+
+ }
+ else
{
-
- return new LocalDemarcationStrategy();
-
+
+ return new LocalDemarcationStrategy();
+
}
-
+
return current;
}
-
+
}
+
private interface TransactionDemarcationStrategy
{
void error();
+
void end();
-
+
}
-
+
private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
{
public void end()
{
final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
+ if (session != null)
{
try
{
@@ -340,15 +342,15 @@
}
}
}
-
+
public void error()
{
final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
-
- if(spec.isSessionTransacted())
+
+ if (spec.isSessionTransacted())
{
- if(session != null)
-
+ if (session != null)
+
try
{
/*
@@ -360,70 +362,79 @@
* we rollback to force redelivery.
*
*/
- if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+ if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
{
- session.rollback();
+ session.rollback();
}
-
+
}
catch (JMSException e)
{
log.error("Failed to rollback session transaction", e);
}
-
+
}
}
-
+
}
private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
{
-
+
boolean trace = log.isTraceEnabled();
-
+
Transaction trans = null;
+
TransactionManager tm = pool.getActivation().getTransactionManager();;
-
+
public XATransactionDemarcationStrategy() throws Throwable
{
-
- tm.begin();
- try
- {
- trans = tm.getTransaction();
+ final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
- if (trace)
- log.trace(JmsServerSession.this + " using tx=" + trans);
+ if (timeout > 0)
+ {
+ log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+ tm.setTransactionTimeout(timeout);
- if (xaSession != null)
- {
- XAResource res = xaSession.getXAResource();
+ }
- if (!trans.enlistResource(res))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
- }
- }
- catch (Throwable t)
+ tm.begin();
+
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(JmsServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
{
- try
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
{
- tm.rollback();
+ throw new JMSException("could not enlist resource");
}
- catch (Throwable ignored)
- {
- log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
- }
- throw t;
+ if (trace)
+ log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
}
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
+ }
- }
-
-
+ }
+
public void error()
{
// Mark for tollback TX via TM
@@ -440,7 +451,7 @@
}
}
-
+
public void end()
{
try
@@ -484,17 +495,17 @@
{
session.commit();
}
-
- }else
+
+ }
+ else
{
tm.suspend();
-
+
if (xaSession == null && pool.getActivation().isDeliveryTransacted())
{
session.rollback();
}
-
-
+
}
}
More information about the jboss-cvs-commits
mailing list