[Jboss-cvs] JBossAS SVN: r56642 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 8 06:15:27 EDT 2006
Author: weston.price at jboss.com
Date: 2006-09-08 06:15:25 -0400 (Fri, 08 Sep 2006)
New Revision: 56642
Modified:
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Log:
[JBAS-3511] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation.
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-09-08 08:28:29 UTC (rev 56641)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-09-08 10:15:25 UTC (rev 56642)
@@ -106,8 +106,10 @@
this.session = session;
this.xaSession = xaSession;
this.delegateListener = delegateListener;
+
if( xaSession == null )
useLocalTX = false;
+
this.useLocalTX = useLocalTX;
this.xidFactory = xidFactory;
@@ -115,10 +117,12 @@
log.debug("initializing (pool, session, xaSession, useLocalTX): " +
pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
- // Set out self as message listener
if (xaSession != null)
+
xaSession.setMessageListener(this);
+
else
+
session.setMessageListener(this);
InitialContext ctx = null;
@@ -183,24 +187,39 @@
public void run()
{
boolean trace = log.isTraceEnabled();
+
if (trace)
- log.trace("running...");
+ log.trace("Running StdServerSession" + this);
+
+ TransactionDemarcationStrategy td = null;
+
+ if (StdServerSessionPool.USE_OLD == false)
+ {
+ td = createTransactionDemarcation();
+
+ if (td == null)
+ return;
+ }
+
try
{
- if (xaSession != null)
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
xaSession.run();
+
else
session.run();
- }
- finally
+
+ }catch(Throwable t)
{
- if (trace)
- log.trace("recycling...");
-
+ if (td != null)
+ td.error();
+
+ }finally
+ {
+ if(td != null)
+ td.end();
+
recycle();
-
- if (trace)
- log.trace("finished run");
}
}
@@ -219,152 +238,40 @@
public void onMessage(Message msg)
{
boolean trace = log.isTraceEnabled();
+
if( trace )
log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
", " + session + ", " + xaSession + ", " + useLocalTX);
-
- // Used if run with useLocalTX if true
- Xid localXid = null;
- boolean localRollbackFlag=false;
- // Used if run with useLocalTX if false
- Transaction trans = null;
+
+ TransactionDemarcationStrategy td = null;
+
+ if (StdServerSessionPool.USE_OLD)
+ {
+ td = createTransactionDemarcation();
+ if (td == null)
+ return;
+ }
+
try
{
+ delegateListener.onMessage(msg);
- if (useLocalTX)
- {
- // Use JBossMQ One Phase Commit to commit the TX
- localXid = xidFactory.newXid();//new XidImpl();
- XAResource res = xaSession.getXAResource();
- res.start(localXid, XAResource.TMNOFLAGS);
-
- if( trace )
- log.trace("Using optimized 1p commit to control TX.");
- }
- else
- {
-
- // Use the TM to control the TX
- tm.begin();
- trans = tm.getTransaction();
-
- if (xaSession != null)
- {
- XAResource res = xaSession.getXAResource();
- if (!trans.enlistResource(res))
- {
- throw new JMSException("could not enlist resource");
- }
- if( trace )
- log.trace("XAResource '" + res + "' enlisted.");
- }
- }
- //currentTransactionId = connection.spyXAResourceManager.startTx();
-
- // run the session
- //session.run();
- // Call delegate listener
- delegateListener.onMessage(msg);
- }
- catch (Exception e)
+ }catch(Throwable t)
{
- log.error("session failed to run; setting rollback only", e);
+ log.error("Unexpected error invoking on message " + msg, t);
- if (useLocalTX)
- {
- // Use JBossMQ One Phase Commit to commit the TX
- localRollbackFlag = true;
- }
- else
- {
- // Mark for tollback TX via TM
- try
- {
- // The transaction will be rolledback in the finally
- if( trace )
- log.trace("Using TM to mark TX for rollback.");
- trans.setRollbackOnly();
- }
- catch (Exception x)
- {
- log.error("failed to set rollback only", x);
- }
- }
+ if(td != null)
+ td.error();
}
finally
{
- try
- {
- if (useLocalTX)
- {
- if( localRollbackFlag == true )
- {
- if( trace )
- log.trace("Using optimized 1p commit to rollback TX.");
-
- XAResource res = xaSession.getXAResource();
- res.end(localXid, XAResource.TMSUCCESS);
- res.rollback(localXid);
-
- }
- else
- {
- if( trace )
- log.trace("Using optimized 1p commit to commit TX.");
-
- XAResource res = xaSession.getXAResource();
- res.end(localXid, XAResource.TMSUCCESS);
- res.commit(localXid, true);
- }
- }
- else
- {
- // Use the TM to commit the Tx (assert the correct association)
- Transaction currentTx = tm.getTransaction();
- if (trans.equals(currentTx) == false)
- throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
-
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
- {
- if( trace )
- log.trace("Rolling back JMS transaction");
- // actually roll it back
- tm.rollback();
-
- // NO XASession? then manually rollback.
- // This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted())
- {
- session.rollback();
- }
- }
- else if (trans.getStatus() == Status.STATUS_ACTIVE)
- {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- if( trace )
- log.trace("Commiting the JMS transaction");
- tm.commit();
-
- // NO XASession? then manually commit. This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted())
- {
- session.commit();
- }
- }
- }
- }
- catch (Exception e)
- {
- log.error("failed to commit/rollback", e);
- }
+ if(td != null)
+ td.end();
}
+
+
+
if( trace )
log.trace("onMessage done");
}
@@ -433,7 +340,7 @@
*/
void recycle()
{
- serverSessionPool.recycle(this);
+ serverSessionPool.recycle(this, false);
}
boolean isTimedOut(long timeout)
@@ -455,5 +362,219 @@
{
return this.hasPermit;
}
+
+ private TransactionDemarcationStrategy createTransactionDemarcation()
+ {
+ return new DemarcationStrategyFactory().getStrategy();
+ }
+
+ private interface TransactionDemarcationStrategy
+ {
+ void error();
+ void end();
+
+ }
+
+ private class DemarcationStrategyFactory
+ {
+
+ public DemarcationStrategyFactory()
+ {
+ }
+
+ TransactionDemarcationStrategy getStrategy()
+ {
+
+ if(!useLocalTX)
+ {
+ try
+ {
+ log.trace("Creating XATransactionDemarcationStrategy");
+ return new XATransactionDemarcationStrategy();
+ }
+ catch (Throwable t)
+ {
+ log.error(this + " error creating transaction demarcation ", t);
+ return null;
+ }
+
+ }else
+ {
+ log.trace("Creating LocalDemarcationStrategy");
+ return new LocalDemarcationStrategy();
+
+ }
+
+ }
+
+ }
+ private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+ public void end()
+ {
+ if(serverSessionPool.isTransacted())
+ {
+ if(session != null)
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to commit session transaction", e);
+ }
+ }
+ }
+ }
+
+ public void error()
+ {
+ if(serverSessionPool.isTransacted())
+ {
+ if(session != null)
+
+ try
+ {
+ session.rollback();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to rollback session transaction", e);
+ }
+
+ }
+ }
+
+ }
+
+ private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+
+ boolean trace = log.isTraceEnabled();
+
+ Transaction trans = null;
+
+ public XATransactionDemarcationStrategy() throws Throwable
+ {
+
+ tm.begin();
+
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(StdServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
+ }
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
+ }
+
+ }
+
+
+ public void error()
+ {
+ try
+ {
+
+ if (trace)
+ log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+
+ trans.setRollbackOnly();
+ }
+ catch (Throwable t)
+ {
+ log.error(StdServerSession.this + " failed to set rollback only", t);
+ }
+
+ }
+
+ public void end()
+ {
+ try
+ {
+
+ // Use the TM to commit the Tx (assert the correct association)
+ Transaction currentTx = tm.getTransaction();
+
+ if (trans.equals(currentTx) == false)
+ throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ if (trace)
+ log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
+ // actually roll it back
+ tm.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.rollback();
+ }
+ }
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if (trace)
+ log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
+ tm.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
+ }
+ else
+ {
+ tm.suspend();
+
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
+ }
+
+
+ }
+ catch (Throwable t)
+ {
+ log.error(StdServerSession.this + " failed to commit/rollback", t);
+ }
+
+ }
+
+ }
}
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-09-08 08:28:29 UTC (rev 56641)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-09-08 10:15:25 UTC (rev 56642)
@@ -5,14 +5,10 @@
* See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-
-import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Iterator;
-
import java.util.List;
import javax.jms.Connection;
@@ -35,6 +31,11 @@
import org.jboss.logging.Logger;
import org.jboss.tm.XidFactoryMBean;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+
/**
* Implementation of ServerSessionPool. <p>
*
@@ -48,6 +49,18 @@
public class StdServerSessionPool
implements ServerSessionPool
{
+ public static final boolean USE_OLD;
+
+ static
+ {
+ USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
+ {
+ public Object run()
+ {
+ return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
+ }
+ })).booleanValue();
+ }
/**
* The thread group which session workers will run.
*/
@@ -85,10 +98,13 @@
*/
private boolean transacted;
+ /** The lazyInitialization */
private boolean lazyInitialization;
+ /** The recycleIdleSessions */
private boolean recycleIdleSessions;
+ /** The idleTimeOut */
private long idleTimeOut;
/**
@@ -177,11 +193,12 @@
// finish initializing the session
if(!lazyInitialization)
{
+ log.debug("StdServerSessionPool is configured for non-lazy inititalization. " + poolSize + " sessions being created for use." );
create(this.poolSize);
}else
{
- log.debug("SessionPool is specified as lazy init. No sessions being created.");
+ log.debug("StdServerSessionPool is specified as lazy init. " + poolSize + " session(s) being created");
create(this.minSize);
}
@@ -276,6 +293,7 @@
{
session = (ServerSession)sessionPool.remove(0);
((StdServerSession)session).setHasPermit(true);
+ counter.inUse();
break;
}
else
@@ -283,7 +301,8 @@
log.trace("Creating lazy session.");
session = createLazySession();
((StdServerSession)session).setHasPermit(true);
- break;
+ counter.inUse();
+ break;
}
}
}
@@ -296,6 +315,9 @@
// assert session != null
if( log.isTraceEnabled() )
log.trace("using server session: " + session);
+
+ int size = (int)(poolSize - permits.permits());
+ counter.maxSessionInUse(size);
return session;
}
@@ -374,6 +396,16 @@
{
return numServerSessions;
}
+
+ public int getCurrentSessionInUseCount()
+ {
+ return counter.getInUseCount();
+ }
+
+ public int getSessionMaxInUseCount()
+ {
+ return counter.getMaxSessionInUse();
+ }
// --- Protected messages for StdServerSession to use
/**
@@ -439,7 +471,7 @@
*
* @param session Description of Parameter
*/
- void recycle(StdServerSession session)
+ void recycle(StdServerSession session, boolean isTimedOut)
{
if(session.getHasPermit())
@@ -450,16 +482,20 @@
synchronized (sessionPool)
{
- if (closing)
+ if(!isTimedOut)
+ counter.decInUse();
+
+ if (closing || isTimedOut)
{
session.close();
numServerSessions--;
-
+
if (numServerSessions == 0)
{
//notify clear thread.
sessionPool.notifyAll();
}
+
}
else
{
@@ -471,6 +507,9 @@
log.trace("recycled server session: " + session);
}
}
+
+
+
}
// --- Private methods used internally
@@ -579,6 +618,8 @@
{
long timeout = System.currentTimeMillis() - idleTimeOut;
+ boolean trace = log.isTraceEnabled();
+ List timedOut = new ArrayList();
while(true)
{
@@ -591,7 +632,15 @@
if (session.isTimedOut(timeout))
{
- recycleIdleSession(session);
+ sessionPool.remove(0);
+ timedOut.add(session);
+
+ if(trace)
+ {
+ log.trace("Recycling timed out session" + session);
+
+ }
+
}
else
@@ -600,11 +649,26 @@
}
}
+
else
break;
}
}
+
+ counter.timedOut(timedOut.size());
+
+ for(Iterator iter = timedOut.iterator(); iter.hasNext();)
+ {
+ StdServerSession session = (StdServerSession)iter.next();
+ log.trace("Recycling timed out StdServerSession" + session);
+ recycle(session, true);
+
+ }
+
+ if(!closing && minSize > 0)
+ JmsSessionPoolFiller.fillPool(this);
+
}
@@ -663,22 +727,56 @@
return thread;
}
}
-
+
+ //Simple class to record SessionStatistics
private static class SessionPoolStatisticsCollector
{
private int timedOut;
+ private int inUse;
+ private int maxInUse;
+ synchronized void decInUse()
+ {
+ --inUse;
+ }
+
+ synchronized void inUse()
+ {
+ ++inUse;
+ }
+
+ synchronized void timedOut(int count)
+ {
+ timedOut += count;
+ }
synchronized void timedOut()
{
- timedOut++;
+ ++timedOut;
+ }
+
+ synchronized void maxSessionInUse(int count)
+ {
+ if(count > maxInUse)
+ maxInUse = count;
}
+ private int getMaxSessionInUse()
+ {
+ return maxInUse;
+ }
+
private int getTimedOutCount()
{
return timedOut;
}
- }
+ private int getInUseCount()
+ {
+ return inUse;
+ }
+
+ }
+
}
More information about the jboss-cvs-commits
mailing list