[jboss-cvs] JBossAS SVN: r57563 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 11 10:51:27 EDT 2006
Author: weston.price at jboss.com
Date: 2006-10-11 10:51:26 -0400 (Wed, 11 Oct 2006)
New Revision: 57563
Modified:
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
Log:
[JBAS-3657][JBAS-3750] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation. Also improved provider logging on failure.
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-10-11 14:43:13 UTC (rev 57562)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-10-11 14:51:26 UTC (rev 57563)
@@ -107,7 +107,7 @@
this.xaSession = xaSession;
this.delegateListener = delegateListener;
- if( xaSession == null )
+ if(xaSession == null)
useLocalTX = false;
this.useLocalTX = useLocalTX;
@@ -117,14 +117,18 @@
log.debug("initializing (pool, session, xaSession, useLocalTX): " +
pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
- if (StdServerSessionPool .USE_OLD && xaSession != null)
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
+ {
xaSession.setMessageListener(this);
+
+ }
else
- session.setMessageListener(this);
+ {
+ session.setMessageListener(this);
+ }
InitialContext ctx = null;
-
-
+
try
{
@@ -153,7 +157,6 @@
}
}
- // --- Impl of JMS standard API
/**
* Returns the session. <p>
@@ -173,8 +176,6 @@
return session;
}
- //--- Protected parts, used by other in the package
-
/**
* Runs in an own thread, basically calls the session.run(), it is up to the
* session to have been filled with messages and it will run against the
@@ -200,22 +201,33 @@
try
{
- if (StdServerSessionPool.USE_OLD && xaSession != null)
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
+ {
xaSession.run();
-
+
+ }
else
+ {
session.run();
+
+ }
}catch(Throwable t)
{
+
if (td != null)
+ {
td.error();
+
+ }
}finally
{
if(td != null)
+ {
td.end();
-
+
+ }
recycle();
}
}
@@ -361,7 +373,7 @@
}
private TransactionDemarcationStrategy createTransactionDemarcation()
- {
+ {
return new DemarcationStrategyFactory().getStrategy();
}
@@ -380,198 +392,187 @@
}
TransactionDemarcationStrategy getStrategy()
- {
-
- if(!useLocalTX)
+ {
+ try
{
- try
- {
- log.trace("Creating XATransactionDemarcationStrategy");
- return new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
- {
- log.error(this + " error creating transaction demarcation ", t);
- return null;
- }
-
- }else
- {
- log.trace("Creating LocalDemarcationStrategy");
- return new LocalDemarcationStrategy();
-
+ return new TransactionDemarcation();
}
-
- }
-
- }
- private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
- {
- public void end()
- {
- if(serverSessionPool.isTransacted())
+ catch (Throwable t)
{
- if(session != null)
- {
- try
- {
- session.commit();
- }
- catch (JMSException e)
- {
- log.error("Failed to commit session transaction", e);
- }
- }
+ log.error(this + " error creating transaction demarcation ", t);
+ return null;
}
}
-
- public void error()
- {
- if(serverSessionPool.isTransacted())
- {
- if(session != null)
-
- try
- {
- session.rollback();
- }
- catch (JMSException e)
- {
- log.error("Failed to rollback session transaction", e);
- }
-
- }
- }
}
+ private class TransactionDemarcation implements TransactionDemarcationStrategy
+ {
+ boolean trace = log.isTraceEnabled();
- private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
- {
-
- boolean trace = log.isTraceEnabled();
-
+ Xid localXid = null;
+ boolean localRollbackFlag = false;
Transaction trans = null;
- public XATransactionDemarcationStrategy() throws Throwable
- {
-
- tm.begin();
-
- try
+ public TransactionDemarcation() throws Throwable
+ {
+ if(useLocalTX)
{
- trans = tm.getTransaction();
+ localXid = xidFactory.newXid();
+ XAResource res = xaSession.getXAResource();
+ res.start(localXid, XAResource.TMNOFLAGS);
+
+ if(trace)
+ log.trace(StdServerSession.this + " using optimized 1p commit to control Tx. xid=" + localXid);
+ }
+ else
+ {
+ tm.begin();
- if (trace)
- log.trace(StdServerSession.this + " using tx=" + trans);
-
- if (xaSession != null)
+ try
{
- XAResource res = xaSession.getXAResource();
+ trans = tm.getTransaction();
- if (!trans.enlistResource(res))
+ if (trace)
+ log.trace(StdServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
{
- throw new JMSException("could not enlist resource");
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
}
- if (trace)
- log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
}
- }
- catch (Throwable t)
- {
- try
+ catch (Throwable t)
{
- tm.rollback();
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
}
- catch (Throwable ignored)
- {
- log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
- }
- throw t;
- }
+ }
}
public void error()
{
- try
+ if(useLocalTX)
{
+ localRollbackFlag = true;
+
+ }else
+ {
+ try
+ {
- if (trace)
- log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+ if (trace)
+ log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+ trans.setRollbackOnly();
+ }
+ catch (Throwable t)
+ {
+ log.error(StdServerSession.this + " failed to set rollback only", t);
+ }
- trans.setRollbackOnly();
}
- catch (Throwable t)
- {
- log.error(StdServerSession.this + " failed to set rollback only", t);
- }
-
+
}
public void end()
{
+
try
{
- // Use the TM to commit the Tx (assert the correct association)
- Transaction currentTx = tm.getTransaction();
-
- if (trans.equals(currentTx) == false)
- throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
-
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ if(useLocalTX)
{
- if (trace)
- log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
- // actually roll it back
- tm.rollback();
-
- // NO XASession? then manually rollback.
- // This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted())
+ if(localRollbackFlag)
{
- session.rollback();
+ XAResource res = xaSession.getXAResource();
+ res.end(localXid, XAResource.TMSUCCESS);
+ res.rollback(localXid);
}
- }
- else if (trans.getStatus() == Status.STATUS_ACTIVE)
- {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- if (trace)
- log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
- tm.commit();
-
- // NO XASession? then manually commit. This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted())
+ else
{
- session.commit();
+ XAResource res = xaSession.getXAResource();
+ res.end(localXid, XAResource.TMSUCCESS);
+ res.commit(localXid, true);
+
}
+
}
else
{
- tm.suspend();
-
- if (xaSession == null && serverSessionPool.isTransacted())
+
+ // Use the TM to commit the Tx (assert the correct association)
+ Transaction currentTx = tm.getTransaction();
+
+ if (trans.equals(currentTx) == false)
+ throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
- session.commit();
+ if (trace)
+ log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
+ // actually roll it back
+ tm.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.rollback();
+ }
}
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if (trace)
+ log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
+ tm.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
+ }
+ else
+ {
+ tm.suspend();
+
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
+ }
+
}
-
-
+
+
}
- catch (Throwable t)
+ catch(Throwable t)
{
- log.error(StdServerSession.this + " failed to commit/rollback", t);
+ log.error(StdServerSession.this + "failed to commit/rollback", t);
+
}
-
}
-
}
}
More information about the jboss-cvs-commits
mailing list