[jboss-cvs] JBossAS SVN: r59868 - branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 20 01:05:21 EST 2007
Author: weston.price at jboss.com
Date: 2007-01-20 01:05:18 -0500 (Sat, 20 Jan 2007)
New Revision: 59868
Modified:
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
Log:
[JBAS-4001] Backport of JCA/JMS adapter to address certain issues in
old code of using 1PC resource as well as handling redelivery for
BMT/CMT NotSupported listeners.
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-01-20 06:05:18 UTC (rev 59868)
@@ -162,13 +162,13 @@
public TransactionManager getTransactionManager()
{
- if(tm == null)
- {
- tm = TransactionManagerLocator.getInstance().locate();
-
- }
-
- return tm;
+ if (tm == null)
+ {
+ tm = TransactionManagerLocator.getInstance().locate();
+
+ }
+
+ return tm;
}
/**
@@ -229,9 +229,9 @@
public void handleFailure(Throwable failure)
{
log.warn("Failure in jms activation " + spec, failure);
- boolean reconnected = false;
+ int reconnectCount = 0;
- while (deliveryActive.get())
+ while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
{
teardown();
try
@@ -248,18 +248,16 @@
try
{
setup();
- reconnected = true;
+ log.info("Reconnected with messaging provider.");
+ break;
}
catch (Throwable t)
{
log.error("Unable to reconnect " + spec, t);
}
- if(reconnected)
- {
- log.info("Reconnected to JMS provider " + spec);
- break;
- }
+ ++reconnectCount;
+
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2007-01-20 06:05:18 UTC (rev 59868)
@@ -114,7 +114,13 @@
/** The DLQ max resent */
private int dLQMaxResent;
-
+
+ //Default to 5 attempts
+ private int reconnectAttempts = 5;
+
+ //Used to specify whether or not we should attempt to redeliver a message in an unspecified txn context
+ private boolean redeliverUnspecified = true;
+
/**
* @return the acknowledgeMode.
*/
@@ -645,4 +651,24 @@
buffer.append(')');
return buffer.toString();
}
+
+ public int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public void setReconnectAttempts(int reconnectAttempts)
+ {
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public boolean getRedeliverUnspecified()
+ {
+ return redeliverUnspecified;
+ }
+
+ public void setRedeliverUnspecified(boolean redeliverUnspecified)
+ {
+ this.redeliverUnspecified = redeliverUnspecified;
+ }
}
\ No newline at end of file
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2007-01-20 06:05:18 UTC (rev 59868)
@@ -76,9 +76,9 @@
/** Any DLQ handler */
DLQHandler dlqHandler;
+
+ TransactionDemarcationStrategy txnStrategy;
- /** The runtimeHandler */
- RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
/**
* Create a new JmsServerSession
@@ -104,23 +104,25 @@
Connection connection = activation.getConnection();
// Create the session
- if (connection instanceof XAConnection)
+ if (connection instanceof XAConnection && activation.isDeliveryTransacted())
{
xaSession = ((XAConnection) connection).createXASession();
session = xaSession.getSession();
}
else
- {
- transacted = spec.isSessionTransacted();
- acknowledge = spec.getAcknowledgeModeInt();
+ {
+ transacted = spec.isSessionTransacted();
+ acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
// Get the endpoint
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
XAResource xaResource = null;
+
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
+
endpoint = endpointFactory.createEndpoint(xaResource);
// Set the message listener
@@ -165,15 +167,6 @@
public void onMessage(Message message)
{
- TransactionDemarcationStrategy td = null;
-
- if (JmsServerSessionPool.USE_OLD)
- {
- td = createTransactionDemarcation();
- if (td == null)
- return;
- }
-
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
@@ -181,11 +174,15 @@
try
{
if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
- ((MessageListener) endpoint).onMessage(message);
+ {
+ MessageListener listener = (MessageListener)endpoint;
+ listener.onMessage(message);
+ }
}
finally
{
endpoint.afterDelivery();
+
if (dlqHandler != null)
dlqHandler.messageDelivered(message);
}
@@ -195,17 +192,11 @@
{
log.error("Unexpected error delivering message " + message, t);
- if (td != null)
- td.error();
+ if(txnStrategy != null)
+ txnStrategy.error();
- runtimeHandler.handleRuntimeError(t);
-
}
- finally
- {
- if (td != null)
- td.end();
- }
+
}
@@ -231,34 +222,33 @@
public void run()
{
- TransactionDemarcationStrategy td = null;
- if (JmsServerSessionPool.USE_OLD == false)
+ try
{
- td = createTransactionDemarcation();
+ txnStrategy = createTransactionDemarcation();
- if (td == null)
- return;
+ }catch(Throwable t)
+ {
+ log.error("Error creating transaction demarcation. Cannot continue.");
+ return;
}
+
try
{
- if (JmsServerSessionPool.USE_OLD && xaSession != null)
- xaSession.run();
-
- else
- session.run();
-
- }
+ session.run();
+ }
catch(Throwable t)
{
- if (td != null)
- td.error();
+ if (txnStrategy != null)
+ txnStrategy.error();
}finally
{
- if (td != null)
- td.end();
+ if(txnStrategy != null)
+ txnStrategy.end();
+
+ txnStrategy = null;
}
}
@@ -294,31 +284,31 @@
private class DemarcationStrategyFactory
{
- public DemarcationStrategyFactory()
- {
- }
-
TransactionDemarcationStrategy getStrategy()
{
+ TransactionDemarcationStrategy current = null;
+ final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JmsActivation activation = pool.getActivation();
- if(pool.getActivation().isDeliveryTransacted())
+ if(activation.isDeliveryTransacted() && xaSession != null)
{
try
{
- return new XATransactionDemarcationStrategy();
- }
+ current = new XATransactionDemarcationStrategy();
+ }
catch (Throwable t)
{
log.error(this + " error creating transaction demarcation ", t);
- return null;
}
}else
{
- return new LocalDemarcationStrategy();
+
+ return new LocalDemarcationStrategy();
}
+ return current;
}
}
@@ -333,7 +323,9 @@
{
public void end()
{
- if(pool.getActivation().getActivationSpec().isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+
+ if(spec.isSessionTransacted())
{
if(session != null)
{
@@ -351,13 +343,28 @@
public void error()
{
- if(pool.getActivation().getActivationSpec().isSessionTransacted())
+ final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+
+ if(spec.isSessionTransacted())
{
if(session != null)
try
{
- session.rollback();
+ /*
+ * Looks strange, but this basically means
+ *
+ * If the underlying connection was non-XA and the transaction attribute is REQUIRED
+ * we rollback. Also, if the underlying connection was non-XA and the transaction
+ * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled
+ * we rollback to force redelivery.
+ *
+ */
+ if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+ {
+ session.rollback();
+ }
+
}
catch (JMSException e)
{
@@ -371,9 +378,9 @@
private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
{
-
- boolean trace = log.isTraceEnabled();
-
+
+ boolean trace = log.isTraceEnabled();
+
Transaction trans = null;
TransactionManager tm = pool.getActivation().getTransactionManager();;
@@ -391,8 +398,8 @@
if (xaSession != null)
{
- XAResource res = JcaXAResourceWrapperFactory.getResourceWrapper(xaSession.getXAResource());
-
+ XAResource res = xaSession.getXAResource();
+
if (!trans.enlistResource(res))
{
throw new JMSException("could not enlist resource");
@@ -477,6 +484,17 @@
{
session.commit();
}
+
+ }else
+ {
+ tm.suspend();
+
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.rollback();
+ }
+
+
}
}
Modified: branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-01-19 23:40:15 UTC (rev 59867)
+++ branches/Branch_4_2/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2007-01-20 06:05:18 UTC (rev 59868)
@@ -45,20 +45,7 @@
{
/** The logger */
private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
-
- public static final boolean USE_OLD;
-
- static
- {
- USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
- {
- public Object run()
- {
- return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
- }
- })).booleanValue();
- }
-
+
/** The activation */
JmsActivation activation;
@@ -73,7 +60,8 @@
/** The number of sessions */
int sessionCount = 0;
-
+
+
/**
* Create a new session pool
*
@@ -119,6 +107,7 @@
log.trace("getServerSession");
ServerSession result = null;
+
try
{
synchronized (serverSessions)
@@ -126,13 +115,16 @@
while (true)
{
int sessionsSize = serverSessions.size();
+
if (stopped)
throw new Exception("Cannot get a server session after the pool is stopped");
+
else if (sessionsSize > 0)
{
result = (ServerSession) serverSessions.remove(sessionsSize-1);
break;
}
+
else
{
try
More information about the jboss-cvs-commits
mailing list