[Jboss-cvs] JBossAS SVN: r56851 - in branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms: . inflow inflow/dlq
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 14 11:21:24 EDT 2006
Author: bill.burke at jboss.com
Date: 2006-09-14 11:21:22 -0400 (Thu, 14 Sep 2006)
New Revision: 56851
Added:
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java
Modified:
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java
Log:
backmerge
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -108,12 +108,6 @@
if (trace)
log.trace("created new managed connection: " + mc);
- // Set default logwriter according to spec
-
- //
- // jason: screw the logWriter stuff for now it sucks ass
- //
-
return mc;
}
Copied: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java (from rev 56850, branches/Branch_4_0/connector/src/main/org/jboss/resource/adapter/jms/inflow/DefaultRuntimeErrorHandler.java)
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -42,10 +42,12 @@
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
import org.jboss.resource.adapter.jms.JmsResourceAdapter;
+import org.jboss.tm.TransactionManagerLocator;
import org.jboss.util.Strings;
import org.jboss.util.naming.Util;
@@ -95,6 +97,10 @@
/** The DLQ handler */
protected DLQHandler dlqHandler;
+ /** The TransactionManager */
+ protected TransactionManager tm;
+
+
static
{
try
@@ -153,6 +159,17 @@
{
return ra.getWorkManager();
}
+
+ public TransactionManager getTransactionManager()
+ {
+ if(tm == null)
+ {
+ tm = TransactionManagerLocator.getInstance().locate();
+
+ }
+
+ return tm;
+ }
/**
* @return the connection
@@ -212,6 +229,7 @@
public void handleFailure(Throwable failure)
{
log.warn("Failure in jms activation " + spec, failure);
+ boolean reconnected = false;
while (deliveryActive.get())
{
@@ -230,11 +248,18 @@
try
{
setup();
+ reconnected = true;
}
catch (Throwable t)
{
log.error("Unable to reconnect " + spec, t);
}
+
+ if(reconnected)
+ {
+ log.info("Reconnected to JMS provider " + spec);
+ break;
+ }
}
}
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -91,7 +91,7 @@
/** The keep alive time for sessions */
private long keepAlive = 60000;
- /** Is the ession transacted */
+ /** Is the session transacted */
private boolean sessionTransacted = true;
/** The DLQ handler class */
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -36,6 +36,9 @@
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.jboss.logging.Logger;
@@ -44,6 +47,7 @@
* A generic jms session pool.
*
* @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
@@ -71,7 +75,10 @@
/** Any DLQ handler */
DLQHandler dlqHandler;
-
+
+ /** The runtimeHandler */
+ RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
+
/**
* Create a new JmsServerSession
*
@@ -80,6 +87,7 @@
public JmsServerSession(JmsServerSessionPool pool)
{
this.pool = pool;
+
}
/**
@@ -156,9 +164,19 @@
public void onMessage(Message message)
{
+ TransactionDemarcationStrategy td = null;
+
+ if (JmsServerSessionPool.USE_OLD)
+ {
+ td = createTransactionDemarcation();
+ if (td == null)
+ return;
+ }
+
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
+
try
{
if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
@@ -171,10 +189,23 @@
dlqHandler.messageDelivered(message);
}
}
+
catch (Throwable t)
{
log.error("Unexpected error delivering message " + message, t);
+
+ if (td != null)
+ td.error();
+
+ runtimeHandler.handleRuntimeError(t);
+
}
+ finally
+ {
+ if (td != null)
+ td.end();
+ }
+
}
public Session getSession() throws JMSException
@@ -199,9 +230,43 @@
public void run()
{
- session.run();
+ TransactionDemarcationStrategy td = null;
+
+ if (JmsServerSessionPool.USE_OLD == false)
+ {
+ td = createTransactionDemarcation();
+
+ if (td == null)
+ return;
+ }
+
+ try
+ {
+ if (JmsServerSessionPool.USE_OLD && xaSession != null)
+ xaSession.run();
+
+ else
+ session.run();
+
+ }
+ catch(Throwable t)
+ {
+ if (td != null)
+ td.error();
+
+ }finally
+ {
+ if (td != null)
+ td.end();
+ }
+
}
-
+
+ private TransactionDemarcationStrategy createTransactionDemarcation()
+ {
+ return new DemarcationStrategyFactory().getStrategy();
+
+ }
public void release()
{
}
@@ -224,4 +289,202 @@
public void workStarted(WorkEvent e)
{
}
+
+ private class DemarcationStrategyFactory
+ {
+
+ public DemarcationStrategyFactory()
+ {
+ }
+
+ TransactionDemarcationStrategy getStrategy()
+ {
+
+ if(pool.getActivation().isDeliveryTransacted())
+ {
+ try
+ {
+ return new XATransactionDemarcationStrategy();
+ }
+ catch (Throwable t)
+ {
+ log.error(this + " error creating transaction demarcation ", t);
+ return null;
+ }
+
+ }else
+ {
+ return new LocalDemarcationStrategy();
+
+ }
+
+ }
+
+ }
+ private interface TransactionDemarcationStrategy
+ {
+ void error();
+ void end();
+
+ }
+
+ private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+ public void end()
+ {
+ if(pool.getActivation().getActivationSpec().isSessionTransacted())
+ {
+ if(session != null)
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to commit session transaction", e);
+ }
+ }
+ }
+ }
+
+ public void error()
+ {
+ if(pool.getActivation().getActivationSpec().isSessionTransacted())
+ {
+ if(session != null)
+
+ try
+ {
+ session.rollback();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to rollback session transaction", e);
+ }
+
+ }
+ }
+
+ }
+
+ private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+
+ boolean trace = log.isTraceEnabled();
+
+ Transaction trans = null;
+ TransactionManager tm = pool.getActivation().getTransactionManager();;
+
+ public XATransactionDemarcationStrategy() throws Throwable
+ {
+
+ tm.begin();
+
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(JmsServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
+ }
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
+ }
+
+ }
+
+
+ public void error()
+ {
+ // Mark for tollback TX via TM
+ try
+ {
+
+ if (trace)
+ log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+ trans.setRollbackOnly();
+ }
+ catch (Throwable t)
+ {
+ log.error(JmsServerSession.this + " failed to set rollback only", t);
+ }
+
+ }
+
+ public void end()
+ {
+ try
+ {
+
+ // Use the TM to commit the Tx (assert the correct association)
+ Transaction currentTx = tm.getTransaction();
+ if (trans.equals(currentTx) == false)
+ throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ if (trace)
+ log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
+ // actually roll it back
+ tm.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.rollback();
+ }
+ }
+
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if (trace)
+ log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
+ tm.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.commit();
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ log.error(JmsServerSession.this + " failed to commit/rollback", t);
+ }
+
+ }
+
+ }
}
\ No newline at end of file
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSessionPool.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -21,6 +21,8 @@
*/
package org.jboss.resource.adapter.jms.inflow;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import javax.jms.Connection;
@@ -43,7 +45,20 @@
{
/** The logger */
private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
-
+
+ public static final boolean USE_OLD;
+
+ static
+ {
+ USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
+ {
+ public Object run()
+ {
+ return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
+ }
+ })).booleanValue();
+ }
+
/** The activation */
JmsActivation activation;
@@ -273,4 +288,5 @@
log.debug("Error closing the consumer " + consumer, t);
}
}
+
}
\ No newline at end of file
Copied: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java (from rev 56850, branches/Branch_4_0/connector/src/main/org/jboss/resource/adapter/jms/inflow/RuntimeErrorHandler.java)
Modified: branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java
===================================================================
--- branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java 2006-09-14 15:20:41 UTC (rev 56850)
+++ branches/JBoss_4_0_4_GA_EJB3_RC9/connector/src/main/org/jboss/resource/adapter/jms/inflow/dlq/AbstractDLQHandler.java 2006-09-14 15:21:22 UTC (rev 56851)
@@ -34,6 +34,7 @@
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
+import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import org.jboss.jms.jndi.JMSProviderAdapter;
More information about the jboss-cvs-commits
mailing list