[jboss-cvs] JBossAS SVN: r58913 - branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 7 17:00:40 EST 2006
Author: weston.price at jboss.com
Date: 2006-12-07 17:00:38 -0500 (Thu, 07 Dec 2006)
New Revision: 58913
Modified:
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSession.java
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Log:
[JBAS-3657][JBAS-3511] Port of patch made for client in integrating with WebSphereMQ
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,23 +1,8 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
@@ -31,10 +16,11 @@
/**
* Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p>
*
+ * Created: Wed Nov 29 15:55:21 2000
+ *
* @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
* @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
- * @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
public interface ServerSessionPoolFactory
@@ -84,7 +70,29 @@
* @return A new pool.
* @throws JMSException for any error
*/
- ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession,
- long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, MessageListener listener)
- throws JMSException;
+ ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack,
+ boolean useLocalTX, MessageListener listener) throws JMSException;
+
+ /**
+ * Create a new <tt>ServerSessionPool</tt>.
+ *
+ * @param destination the destination
+ * @param con the jms connection
+ * @param minSession the minimum number of sessions
+ * @param maxSession the maximum number of sessions
+ * @param keepAlive the time to keep sessions alive
+ * @param isTransacted whether the pool is transacted
+ * @param ack the acknowledegement method
+ * @param listener the listener
+ * @param useLocalTX whether to use local transactions
+ * @param lazy whether the pool is lazily initialized
+ * @param recycle whether or not idle sessions are recycled
+ * @param idleTimeout the time between runs of the reaper thread
+ * @return A new pool.
+ * @throws JMSException for any error
+ */
+ ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack,
+ boolean useLocalTX, boolean lazy, boolean recycle, long idleTimeout, boolean destroyOnError, String supportCodeUrl, String sorterClassName, boolean validateLinkedExcpetion, MessageListener listener) throws JMSException;
+
+
}
\ No newline at end of file
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,30 +1,14 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-
import org.jboss.util.naming.NonSerializableFactory;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.XidFactoryMBean;
@@ -32,12 +16,17 @@
/**
* A loader for <tt>ServerSessionPools</tt>.
*
+ * <p>Created: Wed Nov 29 16:14:46 2000
+ *
* @author <a href="mailto:peter.antman at tim.se">Peter Antman</a>.
* @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
* @version $Revision$
+ *
+ * @jmx.mbean extends="org.jboss.system.ServiceMBean"
*/
-public class ServerSessionPoolLoader extends ServiceMBeanSupport implements ServerSessionPoolLoaderMBean
+public class ServerSessionPoolLoader
+ extends ServiceMBeanSupport
+ implements ServerSessionPoolLoaderMBean
{
/** The factory used to create server session pools. */
private ServerSessionPoolFactory poolFactory;
@@ -50,88 +39,138 @@
private ObjectName xidFactory;
+ /**
+ * Set the pool name.
+ *
+ * @param name The pool name.
+ *
+ * @jmx:managed-attribute
+ */
public void setPoolName(final String name)
{
this.name = name;
}
-
+
+ /**
+ * Get the pool name.
+ *
+ * @return The pool name.
+ *
+ * @jmx:managed-attribute
+ */
public String getPoolName()
{
return name;
}
+ /**
+ * Set the classname of pool factory to use.
+ *
+ * @param classname The name of the pool factory class.
+ *
+ * @jmx:managed-attribute
+ */
public void setPoolFactoryClass(final String classname)
{
this.poolFactoryClass = classname;
}
+ /**
+ * Get the classname of pool factory to use.
+ *
+ * @return The name of the pool factory class.
+ *
+ * @jmx:managed-attribute
+ */
public String getPoolFactoryClass()
{
return poolFactoryClass;
}
+
+
+ /**
+ * mbean get-set pair for field xidFactory
+ * Get the value of xidFactory
+ * @return value of xidFactory
+ *
+ * @jmx:managed-attribute
+ */
public ObjectName getXidFactory()
{
return xidFactory;
}
-
+
+
+ /**
+ * Set the value of xidFactory
+ * @param xidFactory Value to assign to xidFactory
+ *
+ * @jmx:managed-attribute
+ */
public void setXidFactory(final ObjectName xidFactory)
{
this.xidFactory = xidFactory;
}
+
+
+
+ /**
+ * Start the service.
+ *
+ * <p>Bind the pool factory into JNDI.
+ *
+ * @throws Exception
+ */
protected void startService() throws Exception
{
- XidFactoryMBean xidFactoryObj = (XidFactoryMBean) getServer().getAttribute(xidFactory, "Instance");
+ XidFactoryMBean xidFactoryObj = (XidFactoryMBean)getServer().getAttribute(xidFactory, "Instance");
Class cls = Class.forName(poolFactoryClass);
- poolFactory = (ServerSessionPoolFactory) cls.newInstance();
+ poolFactory = (ServerSessionPoolFactory)cls.newInstance();
poolFactory.setName(name);
poolFactory.setXidFactory(xidFactoryObj);
- log.debug("initialized with pool factory: " + poolFactory);
+ if (log.isDebugEnabled())
+ log.debug("initialized with pool factory: " + poolFactory);
InitialContext ctx = new InitialContext();
String name = poolFactory.getName();
String jndiname = "java:/" + name;
- try
- {
+ try {
NonSerializableFactory.rebind(ctx, jndiname, poolFactory);
- log.debug("pool factory " + name + " bound to " + jndiname);
+ log.debug("pool factory " + name + " bound to " + jndiname);
}
- finally
- {
+ finally {
ctx.close();
}
}
+ /**
+ * Stop the service.
+ *
+ * <p>Unbind from JNDI.
+ */
protected void stopService()
{
// Unbind from JNDI
InitialContext ctx = null;
- try
- {
+ try {
ctx = new InitialContext();
String name = poolFactory.getName();
String jndiname = "java:/" + name;
-
+
ctx.unbind(jndiname);
NonSerializableFactory.unbind(jndiname);
log.debug("pool factory " + name + " unbound from " + jndiname);
}
- catch (NamingException ignore)
- {
- }
- finally
- {
- if (ctx != null)
- {
- try
- {
+ catch (NamingException ignore) {}
+ finally {
+ if (ctx != null) {
+ try {
ctx.close();
}
- catch (NamingException ignore)
- {
- }
+ catch (NamingException ignore) {}
}
}
}
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,75 +1,52 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
-import javax.management.ObjectName;
-
/**
* MBean interface.
- *
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
- * @version $Revision$
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
*/
public interface ServerSessionPoolLoaderMBean extends org.jboss.system.ServiceMBean
{
+
/**
* Set the pool name.
- *
* @param name The pool name.
*/
- void setPoolName(String name);
+ void setPoolName(java.lang.String name);
/**
* Get the pool name.
- *
* @return The pool name.
*/
- String getPoolName();
+ java.lang.String getPoolName();
/**
* Set the classname of pool factory to use.
- *
* @param classname The name of the pool factory class.
*/
- void setPoolFactoryClass(String classname);
+ void setPoolFactoryClass(java.lang.String classname);
/**
* Get the classname of pool factory to use.
- *
* @return The name of the pool factory class.
*/
- String getPoolFactoryClass();
+ java.lang.String getPoolFactoryClass();
/**
* mbean get-set pair for field xidFactory Get the value of xidFactory
- *
* @return value of xidFactory
*/
- ObjectName getXidFactory();
+ javax.management.ObjectName getXidFactory();
/**
* Set the value of xidFactory
- *
* @param xidFactory Value to assign to xidFactory
*/
- void setXidFactory(ObjectName xidFactory);
+ void setXidFactory(javax.management.ObjectName xidFactory);
+
}
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,23 +1,8 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
@@ -33,6 +18,7 @@
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+
import org.jboss.logging.Logger;
import org.jboss.tm.TransactionManagerService;
import org.jboss.tm.XidFactoryMBean;
@@ -40,68 +26,75 @@
/**
* An implementation of ServerSession. <p>
*
- * @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
- * @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
- * @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
- * @version $Revision$
+ * Created: Thu Dec 7 18:25:40 2000
+ *
+ * @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
+ * @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
+ * @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
+ * @version $Revision$
*/
-public class StdServerSession implements Runnable, ServerSession, MessageListener
+public class StdServerSession
+ implements Runnable, ServerSession, MessageListener
{
- /** Instance logger. */
+ /**
+ * Instance logger.
+ */
static Logger log = Logger.getLogger(StdServerSession.class);
-
- /** The server session pool which we belong to. */
+
+ /**
+ * The server session pool which we belong to.
+ */
private StdServerSessionPool serverSessionPool;
-
- /** Our session resource. */
+
+ /**
+ * Our session resource.
+ */
private Session session;
-
- /** Our XA session resource. */
+
+ /**
+ * Our XA session resource.
+ */
private XASession xaSession;
-
- /** The transaction manager that we will use for transactions. */
+
+ /**
+ * The transaction manager that we will use for transactions.
+ */
private TransactionManager tm;
-
+
/**
* Use the session's XAResource directly if we have an JBossMQ XASession.
* this allows us to get around the TX timeout problem when you have
* extensive message processing.
*/
private boolean useLocalTX;
-
- /** The listener to delegate calls, to. In our case the container invoker. */
+
+ /**
+ * The listener to delegate calls, to. In our case the container invoker.
+ */
private MessageListener delegateListener;
private XidFactoryMBean xidFactory;
-
+
+ private long lastUse;
+
+ private boolean hasPermit;
+
+ private boolean hasError = false;
+
+ private JMSExceptionSorter sorter;
+
+
/**
- * @deprecated
- * @todo these appeared in jboss-head where are they used?
- */
- public TransactionManager getTransactionManager()
- {
- return tm;
- }
-
- /**
- * @deprecated
- * @todo these appeared in jboss-head where are they used?
- */
- public void setTransactionManager(TransactionManager transactionManager)
- {
- this.tm = transactionManager;
- }
-
- /**
* Create a <tt>StdServerSession</tt> .
*
- * @param pool The server session pool which we belong to.
- * @param session Our session resource.
- * @param xaSession Our XA session resource.
- * @param delegateListener Listener to call when messages arrives.
+ * @param pool The server session pool which we belong to.
+ * @param session Our session resource.
+ * @param xaSession Our XA session resource.
+ * @param delegateListener Listener to call when messages arrives.
* @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
- * @throws JMSException Transation manager was not found.
+ * @throws JMSException Transation manager was not found.
+ * @exception JMSException Description of Exception
*/
StdServerSession(final StdServerSessionPool pool,
final Session session,
@@ -109,74 +102,86 @@
final MessageListener delegateListener,
boolean useLocalTX,
final XidFactoryMBean xidFactory,
- final TransactionManager tm)
- throws JMSException
+ final JMSExceptionSorter sorter)
+ throws JMSException
{
+
+
this.serverSessionPool = pool;
this.session = session;
this.xaSession = xaSession;
this.delegateListener = delegateListener;
- if (xaSession == null)
+
+ if(xaSession == null)
useLocalTX = false;
+
this.useLocalTX = useLocalTX;
- this.xidFactory = xidFactory;
- this.tm = tm;
-
- log.debug("initializing (pool, session, xaSession, useLocalTX): " +
- pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
-
- // Set out self as message listener
- if (xaSession != null)
+ this.xidFactory = xidFactory;
+ this.sorter = sorter;
+
+ if (log.isDebugEnabled())
+ log.debug("initializing (pool, session, xaSession, useLocalTX): " +
+ pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
+
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
+ {
xaSession.setMessageListener(this);
+
+ }
else
- session.setMessageListener(this);
-
- if (tm == null)
{
- InitialContext ctx = null;
- try
+ session.setMessageListener(this);
+ }
+
+ InitialContext ctx = null;
+
+ try
+ {
+
+ ctx = new InitialContext();
+ tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
+ lastUse = System.currentTimeMillis();
+
+ }
+ catch (Exception e)
+ {
+ throw new JMSException("Transation manager was not found");
+ }
+
+ finally
+ {
+ if (ctx != null)
{
- ctx = new InitialContext();
- this.tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
- }
- catch (Exception e)
- {
- throw new JMSException("Transation manager was not found");
- }
- finally
- {
- if (ctx != null)
+ try
{
- try
- {
- ctx.close();
- }
- catch (Exception ignore)
- {
- }
+ ctx.close();
}
+ catch (Exception ignore)
+ {
+ }
}
}
}
-
+
+
/**
* Returns the session. <p>
- * <p/>
+ *
* This simply returns what it has fetched from the connection. It is up to
* the jms provider to typecast it and have a private API to stuff messages
* into it.
*
- * @return The session.
- * @throws JMSException Description of Exception
+ * @return The session.
+ * @exception JMSException Description of Exception
*/
public Session getSession() throws JMSException
{
- if (xaSession != null)
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
return xaSession;
else
return session;
}
-
+
/**
* Runs in an own thread, basically calls the session.run(), it is up to the
* session to have been filled with messages and it will run against the
@@ -186,30 +191,61 @@
public void run()
{
boolean trace = log.isTraceEnabled();
+
if (trace)
- log.trace("running...");
+ log.trace("Running StdServerSession" + this);
+
+ TransactionDemarcationStrategy td = null;
+
+
+ if (StdServerSessionPool.USE_OLD == false)
+ {
+ td = createTransactionDemarcation();
+
+ if (td == null)
+ return;
+ }
+
try
{
- if (xaSession != null)
+
+ if (StdServerSessionPool.USE_OLD && xaSession != null)
+ {
xaSession.run();
+
+ }
else
+ {
session.run();
+
+ }
+
+ }catch(Throwable t)
+ {
+
+ if (td != null)
+ {
+ td.error();
+
+ }
+
+ handleSessionFailure(t);
+
}
finally
{
- if (trace)
- log.trace("recycling...");
-
+ if(td != null)
+ {
+ td.end();
+
+ }
recycle();
-
- if (trace)
- log.trace("finished run");
}
}
/**
* Will get called from session for each message stuffed into it.
- * <p/>
+ *
* Starts a transaction with the TransactionManager
* and enlists the XAResource of the JMS XASession if a XASession was
* available. A good JMS implementation should provide the XASession for use
@@ -220,107 +256,324 @@
* not be too common (JBossMQ provides XASessions).
*/
public void onMessage(Message msg)
- {
+ {
boolean trace = log.isTraceEnabled();
- if (trace)
+
+ if( trace )
log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
- ", " + session + ", " + xaSession + ", " + useLocalTX);
-
- // Used if run with useLocalTX if true
- Xid localXid = null;
- boolean localRollbackFlag = false;
- // Used if run with useLocalTX if false
- Transaction trans = null;
+ ", " + session + ", " + xaSession + ", " + useLocalTX);
+
+ TransactionDemarcationStrategy td = null;
+
+ if (StdServerSessionPool.USE_OLD)
+ {
+ td = createTransactionDemarcation();
+ if (td == null)
+ return;
+ }
+
try
{
+ delegateListener.onMessage(msg);
+
+ }catch(Throwable t)
+ {
+ log.error("Unexpected error invoking on message " + msg, t);
+
+ if(td != null)
+ td.error();
+
+ }
+ finally
+ {
+ if(td != null)
+ td.end();
+ }
+
+
+
+ if( trace )
+ log.trace("onMessage done");
+ }
- if (useLocalTX)
+ /**
+ * Start the session and begin consuming messages.
+ *
+ * @throws JMSException No listener has been specified.
+ */
+ public void start() throws JMSException
+ {
+ log.trace("starting invokes on server session");
+
+ if (session != null)
+ {
+ try
{
- // Use JBossMQ One Phase Commit to commit the TX
- localXid = xidFactory.newXid();//new XidImpl();
- XAResource res = xaSession.getXAResource();
- res.start(localXid, XAResource.TMNOFLAGS);
+ serverSessionPool.getExecutor().execute(this);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ else
+ {
+ throw new JMSException("No listener has been specified");
+ }
+ }
- if (trace)
- log.trace("Using optimized 1p commit to control TX.");
+ /**
+ * Called by the ServerSessionPool when the sessions should be closed.
+ */
+ void close()
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
}
- else
+ catch (Exception ignore)
{
+ }
- // Use the TM to control the TX
- tm.begin();
- trans = tm.getTransaction();
+ session = null;
+ }
- if (xaSession != null)
+ if (xaSession != null)
+ {
+ try
+ {
+ xaSession.close();
+ }
+ catch (Exception ignore)
+ {
+ }
+ xaSession = null;
+ }
+
+ log.debug("closed");
+ }
+
+ /**
+ * This method is called by the ServerSessionPool when it is ready to be
+ * recycled intot the pool
+ */
+ void recycle()
+ {
+ serverSessionPool.recycle(this, false);
+ }
+
+ boolean getHasError()
+ {
+ return hasError;
+
+ }
+ void setHasError(boolean destroy)
+ {
+ this.hasError = destroy;
+ }
+
+ boolean isTimedOut(long timeout)
+ {
+ return this.lastUse < timeout;
+ }
+
+ void setLastUsed(long lastUsed)
+ {
+ this.lastUse = lastUsed;
+ }
+
+ public void setHasPermit(boolean permit)
+ {
+ this.hasPermit = permit;
+ }
+
+ public boolean getHasPermit()
+ {
+ return this.hasPermit;
+ }
+
+ private TransactionDemarcationStrategy createTransactionDemarcation()
+ {
+ return new DemarcationStrategyFactory().getStrategy();
+ }
+
+ private void handleSessionFailure(final Throwable t)
+ {
+ if(t instanceof JMSException)
+ {
+ JMSException jmex = (JMSException)t;
+
+ if(log.isTraceEnabled())
+ {
+ log.trace("JMSException thrown in: " + this + " ", jmex);
+ log.trace("JMSException error code: " + jmex.getErrorCode());
+
+ Exception linked = jmex.getLinkedException();
+
+ if(linked != null)
{
- XAResource res = xaSession.getXAResource();
- if (!trans.enlistResource(res))
+ log.trace("JMSLinkedException is ", jmex);
+
+ if(linked instanceof JMSException)
{
- throw new JMSException("could not enlist resource");
+ JMSException linkedex = (JMSException)linked;
+ log.trace("Linked JMSException error code: " + linkedex.getErrorCode());
}
- if (trace)
- log.trace("XAResource '" + res + "' enlisted.");
}
+
}
- // Call delegate listener
- delegateListener.onMessage(msg);
+
+ if(sorter != null)
+ {
+ setHasError(sorter.isJMSExceptionFatal(jmex));
+ }
+
}
- catch (Exception e)
+ }
+
+
+ private interface TransactionDemarcationStrategy
+ {
+ void error();
+ void end();
+
+ }
+
+ private class DemarcationStrategyFactory
+ {
+
+ public DemarcationStrategyFactory()
{
- log.error("session failed to run; setting rollback only", e);
+ }
- if (useLocalTX)
+ TransactionDemarcationStrategy getStrategy()
+ {
+ try
{
- // Use JBossMQ One Phase Commit to commit the TX
- localRollbackFlag = true;
+ return new TransactionDemarcation();
}
+ catch (Throwable t)
+ {
+ log.error(this + " error creating transaction demarcation ", t);
+ return null;
+ }
+ }
+
+ }
+ private class TransactionDemarcation implements TransactionDemarcationStrategy
+ {
+ boolean trace = log.isTraceEnabled();
+
+ Xid localXid = null;
+ boolean localRollbackFlag = false;
+ Transaction trans = null;
+
+ public TransactionDemarcation() throws Throwable
+ {
+ if(useLocalTX)
+ {
+ localXid = xidFactory.newXid();
+ XAResource res = xaSession.getXAResource();
+ res.start(localXid, XAResource.TMNOFLAGS);
+
+ if(trace)
+ log.trace(StdServerSession.this + " using optimized 1p commit to control Tx. xid=" + localXid);
+ }
+ else
+ {
+ tm.begin();
+
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(StdServerSession.this + " using tx=" + trans);
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
+ }
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
+ }
+ }
+
+ }
+
+
+ public void error()
+ {
+ if(useLocalTX)
+ {
+ localRollbackFlag = true;
+ }
else
{
- // Mark for tollback TX via TM
try
{
- // The transaction will be rolledback in the finally
+
if (trace)
- log.trace("Using TM to mark TX for rollback.");
+ log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
trans.setRollbackOnly();
}
- catch (Exception x)
+ catch (Throwable t)
{
- log.error("failed to set rollback only", x);
+ handleSessionFailure(t);
+ log.error(StdServerSession.this + " failed to set rollback only", t);
}
+
}
-
+
}
- finally
+
+ public void end()
{
+
try
{
- if (useLocalTX)
+
+ if(useLocalTX)
{
- if (localRollbackFlag == true)
+ if(localRollbackFlag)
{
- if (trace)
- log.trace("Using optimized 1p commit to rollback TX.");
-
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
- res.rollback(localXid);
-
+ res.rollback(localXid);
}
else
{
- if (trace)
- log.trace("Using optimized 1p commit to commit TX.");
-
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
res.commit(localXid, true);
+
}
+
}
else
{
+
// Use the TM to commit the Tx (assert the correct association)
Transaction currentTx = tm.getTransaction();
+
if (trans.equals(currentTx) == false)
throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
@@ -328,7 +581,7 @@
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if (trace)
- log.trace("Rolling back JMS transaction");
+ log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
// actually roll it back
tm.rollback();
@@ -347,7 +600,7 @@
// a) everything goes well
// b) app. exception was thrown
if (trace)
- log.trace("Commiting the JMS transaction");
+ log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
tm.commit();
// NO XASession? then manually commit. This is not so good but
@@ -359,92 +612,24 @@
}
else
{
- if(trace)
- log.trace(StdServerSession.this + "transaction already ended");
-
tm.suspend();
-
+
if (xaSession == null && serverSessionPool.isTransacted())
{
- session.rollback();
+ session.commit();
}
-
}
+
}
+
+
}
- catch (Exception e)
+ catch(Throwable t)
{
- log.error("failed to commit/rollback", e);
+ log.error(StdServerSession.this + "failed to commit/rollback", t);
+ handleSessionFailure(t);
}
}
- if (trace)
- log.trace("onMessage done");
}
+}
- /**
- * Start the session and begin consuming messages.
- *
- * @throws JMSException No listener has been specified.
- */
- public void start() throws JMSException
- {
- log.trace("starting invokes on server session");
-
- if (session != null)
- {
- try
- {
- serverSessionPool.getExecutor().execute(this);
- }
- catch (InterruptedException ignore)
- {
- }
- }
- else
- {
- throw new JMSException("No listener has been specified");
- }
- }
-
- /**
- * Called by the ServerSessionPool when the sessions should be closed.
- */
- void close()
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception ignore)
- {
- }
-
- session = null;
- }
-
- if (xaSession != null)
- {
- try
- {
- xaSession.close();
- }
- catch (Exception ignore)
- {
- }
- xaSession = null;
- }
-
- log.debug("closed");
- }
-
- /**
- * This method is called by the ServerSessionPool when it is ready to be
- * recycled intot the pool
- */
- void recycle()
- {
- serverSessionPool.recycle(this);
- }
-}
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,26 +1,12 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
-
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -41,78 +27,213 @@
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
-import javax.transaction.TransactionManager;
import org.jboss.logging.Logger;
import org.jboss.tm.XidFactoryMBean;
import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
/**
- * Implementation of ServerSessionPool.
+ * Implementation of ServerSessionPool. <p>
*
+ * Created: Thu Dec 7 17:02:03 2000
+ *
* @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
* @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
-public class StdServerSessionPool implements ServerSessionPool
+public class StdServerSessionPool
+ implements ServerSessionPool
{
- /** The thread group which session workers will run. */
- private static ThreadGroup threadGroup = new ThreadGroup("ASF Session Pool Threads");
+ public static final boolean USE_OLD;
+
+ static
+ {
+ USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
+ {
+ public Object run()
+ {
+ return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
+ }
+ })).booleanValue();
+ }
+ /**
+ * The thread group which session workers will run.
+ */
+ private static ThreadGroup threadGroup =
+ new ThreadGroup("ASF Session Pool Threads");
- /** Instance logger. */
+ /**
+ * Instance logger.
+ */
private final Logger log = Logger.getLogger(this.getClass());
/** The minimum size of the pool */
private int minSize;
- /** The size of the pool. */
+ /**
+ * The size of the pool.
+ */
private int poolSize;
- /** The message acknowledgment mode. */
+ /** How long to keep sessions alive */
+ private long keepAlive;
+
+ /**
+ * The message acknowledgment mode.
+ */
private int ack;
- /** Is the bean container managed? */
+ /**
+ * Is the bean container managed?
+ */
private boolean useLocalTX;
- /** True if this is a transacted session. */
+ /**
+ * True if this is a transacted session.
+ */
private boolean transacted;
-
- /** The destination. */
+
+ /** The lazyInitialization */
+ private boolean lazyInitialization;
+
+ /** The recycleIdleSessions */
+ private boolean recycleIdleSessions;
+
+ /** The idleTimeOut */
+ private long idleTimeOut;
+
+ /**
+ * The destination.
+ */
private Destination destination;
- /** The session connection. */
+ /**
+ * The session connection.
+ */
private Connection con;
- /** The message listener for the session. */
+ /**
+ * The message listener for the session.
+ */
private MessageListener listener;
- /** The list of ServerSessions. */
+ /**
+ * The list of ServerSessions.
+ */
private List sessionPool;
- /** The executor for processing messages? */
+ /**
+ * The executor for processing messages?
+ */
private PooledExecutor executor;
- /** Used to signal when the Pool is being closed down */
+ /**
+ * Used to signal when the Pool is being closed down
+ */
private boolean closing = false;
-
- /** Used during close down to wait for all server sessions to be returned and closed. */
+
+ /**
+ * Used during close down to wait for all server sessions to be returned and
+ * closed.
+ */
private int numServerSessions = 0;
-
+
+ private boolean destroyOnError = false;
+
+ private JMSExceptionSorter sorter;
+
private XidFactoryMBean xidFactory;
+
+ private FIFOSemaphore permits;
+
+ private SessionPoolStatisticsCollector counter = new SessionPoolStatisticsCollector();
+
+ private JMSSupportCodeHandler handler;
+
+ private String supportCodeUrl;
+
+ private String sorterClassName;
+
+ public StdServerSessionPool(final Destination destination,
+ final Connection con,
+ final boolean transacted,
+ final int ack,
+ final boolean useLocalTx,
+ final MessageListener listener,
+ final int minSession,
+ final int maxSession,
+ final long keepAlive,
+ final boolean lazy,
+ final boolean recycle,
+ final long idleTimeout,
+ final boolean destroyOnError,
+ final String supportCodeUrl,
+ final String sorterClassName,
+ final boolean validateLinkedException,
+ final XidFactoryMBean xidFactory) throws JMSException
+ {
+
+ this.destination = destination;
+ this.con = con;
+ this.ack = ack;
+ this.listener = listener;
+ this.transacted = transacted;
+ this.minSize = minSession;
+ this.poolSize = maxSession;
+ this.keepAlive = keepAlive;
+ this.sessionPool = new ArrayList(maxSession);
+ this.useLocalTX = useLocalTx;
+ this.xidFactory = xidFactory;
+
+ // setup the worker pool
+ executor = new MyPooledExecutor(poolSize);
+ executor.setMinimumPoolSize(minSize);
+ executor.setKeepAliveTime(keepAlive);
+ executor.waitWhenBlocked();
+ executor.setThreadFactory(new DefaultThreadFactory());
+
+ permits = new FIFOSemaphore(maxSession);
+
+ this.lazyInitialization = lazy;
+ this.recycleIdleSessions = recycle;
+ this.idleTimeOut = idleTimeout;
+ this.destroyOnError = destroyOnError;
+ // finish initializing the session
+ if(!lazyInitialization)
+ {
+ log.debug("StdServerSessionPool is configured for non-lazy inititalization. " + poolSize + " sessions being created for use." );
+ create(this.poolSize);
- private TransactionManager tm;
-
+ }else
+ {
+ log.debug("StdServerSessionPool is specified as lazy initialization. " + poolSize + " session(s) being created");
+ create(this.minSize);
+ }
+
+ if(recycleIdleSessions)
+ {
+ log.debug("Session Pool registering for background recycling at interval" + idleTimeout);
+ JmsServerSessionReaper.registerSessionPool(this, idleTimeOut);
+
+ }
+
+ this.supportCodeUrl = supportCodeUrl;
+ this.sorterClassName = sorterClassName;
+
+ handler = new JMSSupportCodeHandler(supportCodeUrl, sorterClassName, validateLinkedException);
+
+ log.debug("Server Session pool set up");
+
+
+ }
/**
- * Construct a <tt>StdServerSessionPool</tt>. Note the maxSession parameter controls
- * both the maximum number of sessions in the pool, as well as the number of listener
- * threads assigned to service requests from the JMS Provider.
- *
- *
- *
+ * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
+ *
* @param destination the destination
* @param con connection to get sessions from
* @param transacted transaction mode when not XA (
@@ -121,8 +242,7 @@
* @param minSession minumum number of sessions in the pool
* @param maxSession maximum number of sessions in the pool
* @param keepAlive the time to keep sessions alive
- * @param xidFactory the xid factory
- * @param tm the transaction manager
+ * @param xidFactory Description of Parameter
* @exception JMSException Description of Exception
*/
public StdServerSessionPool(final Destination destination,
@@ -134,8 +254,7 @@
final int minSession,
final int maxSession,
final long keepAlive,
- final XidFactoryMBean xidFactory,
- final TransactionManager tm)
+ final XidFactoryMBean xidFactory)
throws JMSException
{
this.destination = destination;
@@ -145,10 +264,10 @@
this.transacted = transacted;
this.minSize = minSession;
this.poolSize = maxSession;
+ this.keepAlive = keepAlive;
this.sessionPool = new ArrayList(maxSession);
this.useLocalTX = useLocalTX;
this.xidFactory = xidFactory;
- this.tm = tm;
// setup the worker pool
executor = new MyPooledExecutor(poolSize);
executor.setMinimumPoolSize(minSize);
@@ -157,10 +276,12 @@
executor.setThreadFactory(new DefaultThreadFactory());
// finish initializing the session
- create();
+ create(this.poolSize);
log.debug("Server Session pool set up");
}
+ // --- JMS API for ServerSessionPool
+
/**
* Get a server session.
*
@@ -175,6 +296,9 @@
try
{
+ //Note, we will block until we get a new one to keep old behavior
+ permits.acquire();
+
while (true)
{
synchronized (sessionPool)
@@ -186,17 +310,17 @@
else if (sessionPool.size() > 0)
{
session = (ServerSession)sessionPool.remove(0);
+ ((StdServerSession)session).setHasPermit(true);
+ counter.inUse();
break;
}
else
{
- try
- {
- sessionPool.wait();
- }
- catch (InterruptedException ignore)
- {
- }
+ log.trace("Creating lazy session.");
+ session = createLazySession();
+ ((StdServerSession)session).setHasPermit(true);
+ counter.inUse();
+ break;
}
}
}
@@ -208,9 +332,13 @@
if( log.isTraceEnabled() )
log.trace("using server session: " + session);
+
+ int size = (int)(poolSize - permits.permits());
+ counter.maxSessionInUse(size);
return session;
}
-
+
+
/**
* Clear the pool, clear out both threads and ServerSessions,
* connection.stop() should be run before this method.
@@ -225,7 +353,11 @@
// ThreadPool won't leave any more threads out.
closing = true;
- log.debug("Clearing " + sessionPool.size() + " from ServerSessionPool");
+ if (log.isDebugEnabled())
+ {
+ log.debug("Clearing " + sessionPool.size() +
+ " from ServerSessionPool");
+ }
Iterator iter = sessionPool.iterator();
while (iter.hasNext())
@@ -239,6 +371,9 @@
sessionPool.clear();
sessionPool.notifyAll();
}
+
+ //Unregister with the Reaper
+ JmsServerSessionReaper.unregisterSessionPool(this);
//Must be outside synchronized block because of recycle method.
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
@@ -258,7 +393,7 @@
}
}
}
-
+
/**
* Get the executor we are using.
*
@@ -268,7 +403,45 @@
{
return executor;
}
+
+ public int getTimedOut()
+ {
+ return counter.getTimedOutCount();
+ }
+ public int getCurrentSessionCount()
+ {
+ return numServerSessions;
+ }
+
+ public int getCurrentSessionInUseCount()
+ {
+ return counter.getInUseCount();
+ }
+
+ public int getSessionMaxInUseCount()
+ {
+ return counter.getMaxSessionInUse();
+ }
+
+ public int getSessionErrorCount()
+ {
+ return counter.getSessionErrorCount();
+
+ }
+
+ public int getSessionCreatedCount()
+ {
+ return counter.getTotalSessionCreated();
+
+ }
+
+ public int getSessionDestroyedCount()
+ {
+ return counter.getSessionDestroyedCount();
+
+ }
+
// --- Protected messages for StdServerSession to use
/**
@@ -280,45 +453,189 @@
{
return transacted;
}
+
+ public void fillToMin() throws JMSException
+ {
+
+ try
+ {
+ permits.acquire();
+
+ synchronized (sessionPool)
+ {
+ if(numServerSessions < minSize)
+ create(minSize);
+
+ }
+
+ }catch(InterruptedException e)
+ {
+ log.debug("Interrupted exception in attemping to fill JMS Session pool to minimum allowance.", e);
+ }
+ finally
+ {
+ permits.release();
+ }
+
+ }
+
+ public void refreshSupportCodeFile()
+ {
+ handler.reloadSupportFile();
+
+ }
+
+ public void loadSupportCodeFile(String fileName)
+ {
+
+ handler.reloadSupportFile(fileName);
+ }
+
+ public Integer[] listCurrentSupportCodes()
+ {
+ return handler.getCurrentCodes();
+
+ }
+
+ public void clearCurrentSupportCodes()
+ {
+
+ handler.clearSupportCodes();
+
+ }
+
+ public void addSupportCodes(final String fileName)
+ {
+ handler.addSupportCodes(fileName);
+ }
+
+ public boolean isCodeInUse(int code)
+ {
+ return handler.isCodeInUse(code);
+
+ }
/**
* Recycle a server session.
*
* @param session Description of Parameter
*/
- void recycle(StdServerSession session)
+ void recycle(StdServerSession session, boolean isTimedOut)
{
- synchronized (sessionPool)
+
+ if(session.getHasPermit())
+ {
+ session.setHasPermit(false);
+ permits.release();
+ }
+
+ synchronized (sessionPool)
{
- if (closing)
+ if(!isTimedOut)
+ counter.decInUse();
+
+ if(session.getHasError())
+ counter.incError();
+
+ if (closing || isTimedOut || (session.getHasError() && destroyOnError))
{
+
+ if(session.getHasError())
+ {
+ counter.incDestroyed();
+
+ }
+
session.close();
+
+
numServerSessions--;
+
if (numServerSessions == 0)
{
//notify clear thread.
sessionPool.notifyAll();
}
+
}
else
{
+ session.setLastUsed(System.currentTimeMillis());
sessionPool.add(session);
sessionPool.notifyAll();
+
if( log.isTraceEnabled() )
log.trace("recycled server session: " + session);
}
}
+
+
+
}
- private void create() throws JMSException
+ // --- Private methods used internally
+
+ private ServerSession createLazySession() throws JMSException
{
- for (int index = 0; index < poolSize; index++)
+ boolean debug = log.isDebugEnabled();
+ Session ses = null;
+ XASession xaSes = null;
+
+ if (debug)
+ log.debug("initializing lazy session with connection: " + con);
+
+ if (destination instanceof Topic && con instanceof XATopicConnection)
{
+ xaSes = ((XATopicConnection)con).createXATopicSession();
+ ses = ((XATopicSession)xaSes).getTopicSession();
+ }
+ else if (destination instanceof Queue && con instanceof XAQueueConnection)
+ {
+ xaSes = ((XAQueueConnection)con).createXAQueueSession();
+ ses = ((XAQueueSession)xaSes).getQueueSession();
+ }
+ else if (destination instanceof Topic && con instanceof TopicConnection)
+ {
+ ses = ((TopicConnection)con).createTopicSession(transacted, ack);
+ log.warn("Using a non-XA TopicConnection. " +
+ "It will not be able to participate in a Global UOW");
+ }
+ else if (destination instanceof Queue && con instanceof QueueConnection)
+ {
+ ses = ((QueueConnection)con).createQueueSession(transacted, ack);
+ log.warn("Using a non-XA QueueConnection. " +
+ "It will not be able to participate in a Global UOW");
+ }
+ else
+ {
+ throw new JMSException("Connection was not reconizable: " + con + " for destination " + destination);
+ }
+
+ // create the server session and add it to the pool - it is up to the
+ // server session to set the listener
+ StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
+ listener, useLocalTX, xidFactory, handler);
+ numServerSessions++;
+ counter.incTotalCreated();
+
+ if (debug)
+ log.debug("" + serverSession);
+
+ return serverSession;
+ }
+
+ private void create(int size) throws JMSException
+ {
+ boolean debug = log.isDebugEnabled();
+
+ for (int index = 0; index < size; index++)
+ {
// Here is the meat, that MUST follow the spec
Session ses = null;
XASession xaSes = null;
- log.debug("initializing with connection: " + con);
+ if (debug)
+ log.debug("initializing with connection: " + con);
if (destination instanceof Topic && con instanceof XATopicConnection)
{
@@ -350,15 +667,76 @@
// create the server session and add it to the pool - it is up to the
// server session to set the listener
StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
- listener, useLocalTX, xidFactory, tm);
-
+ listener, useLocalTX, xidFactory, handler);
+
+ counter.incTotalCreated();
sessionPool.add(serverSession);
numServerSessions++;
-
- log.debug("added server session to the pool: " + serverSession);
+
+ if (debug)
+ log.debug("added server session to the pool: " + serverSession);
}
}
-
+
+
+ void removeTimedOut()
+ {
+
+ long timeout = System.currentTimeMillis() - idleTimeOut;
+ boolean trace = log.isTraceEnabled();
+ List timedOut = new ArrayList();
+
+ while(true)
+ {
+ synchronized (sessionPool)
+ {
+
+ if(sessionPool.size() > 0)
+ {
+ StdServerSession session = (StdServerSession)sessionPool.get(0);
+
+ if (session.isTimedOut(timeout))
+ {
+ sessionPool.remove(0);
+ timedOut.add(session);
+
+ if(trace)
+ {
+ log.trace("Recycling timed out session" + session);
+
+ }
+
+ }
+
+ else
+ {
+ break;
+ }
+
+ }
+
+ else
+ break;
+ }
+
+ }
+
+ counter.timedOut(timedOut.size());
+
+ for(Iterator iter = timedOut.iterator(); iter.hasNext();)
+ {
+ StdServerSession session = (StdServerSession)iter.next();
+ log.trace("Recycling timed out StdServerSession" + session);
+ recycle(session, true);
+
+ }
+
+ if(!closing && minSize > 0)
+ JmsSessionPoolFiller.fillPool(this);
+
+ }
+
+
/**
* A pooled executor where the minimum pool size
* threads are kept alive
@@ -414,4 +792,90 @@
return thread;
}
}
+
+ //Simple class to record SessionStatistics
+ private static class SessionPoolStatisticsCollector
+ {
+ private int timedOut;
+ private int inUse;
+ private int maxInUse;
+ private int error;
+ private int totalCreated;
+ private int destroyedCount;
+
+ synchronized void incTotalCreated()
+ {
+ ++totalCreated;
+
+ }
+
+ synchronized void decInUse()
+ {
+ --inUse;
+ }
+
+ synchronized void incDestroyed()
+ {
+ ++destroyedCount;
+
+ }
+
+ synchronized void inUse()
+ {
+ ++inUse;
+ }
+
+ synchronized void timedOut(int count)
+ {
+ timedOut += count;
+ }
+ synchronized void timedOut()
+ {
+ ++timedOut;
+
+ }
+ synchronized void incError()
+ {
+ ++error;
+
+ }
+ synchronized void maxSessionInUse(int count)
+ {
+ if(count > maxInUse)
+ maxInUse = count;
+ }
+
+ private int getMaxSessionInUse()
+ {
+ return maxInUse;
+ }
+
+ private int getTimedOutCount()
+ {
+ return timedOut;
+ }
+
+ private int getInUseCount()
+ {
+ return inUse;
+ }
+
+ private int getSessionErrorCount()
+ {
+ return error;
+
+ }
+
+ private int getTotalSessionCreated()
+ {
+ return totalCreated;
+ }
+
+ private int getSessionDestroyedCount()
+ {
+ return destroyedCount;
+
+ }
+ }
+
}
Modified: branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
===================================================================
--- branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java 2006-12-07 22:00:20 UTC (rev 58912)
+++ branches/JBoss_4_0_5_GA_JBAS_3657/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java 2006-12-07 22:00:38 UTC (rev 58913)
@@ -1,56 +1,42 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, the OpenSource J2EE webOS
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import java.io.Serializable;
-
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
+
import javax.jms.ServerSessionPool;
import javax.transaction.TransactionManager;
import org.jboss.tm.XidFactoryMBean;
/**
- * An implementation of ServerSessionPoolFactory.
+ * An implementation of ServerSessionPoolFactory. <p>
*
+ * Created: Fri Dec 22 09:47:41 2000
+ *
* @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
* @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
- * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
-public class StdServerSessionPoolFactory implements ServerSessionPoolFactory, Serializable
+public class StdServerSessionPoolFactory
+ implements ServerSessionPoolFactory, Serializable
{
- private static final long serialVersionUID = 4969432475779524576L;
+ static final long serialVersionUID = 4969432475779524576L;
/** The name of this factory. */
private String name;
private XidFactoryMBean xidFactory;
- private TransactionManager transactionManager;
-
public StdServerSessionPoolFactory()
{
super();
@@ -75,20 +61,19 @@
{
return xidFactory;
}
+
- public void setTransactionManager(TransactionManager transactionManager)
+ public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, boolean lazyInit, boolean recycle, long idleTimeout, boolean destroySessionOnError, String supportCodeUrl, String sorterClassName, boolean validateLinkedException, javax.jms.MessageListener listener) throws javax.jms.JMSException
{
- this.transactionManager = transactionManager;
+
+ ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, lazyInit, recycle, idleTimeout, destroySessionOnError, supportCodeUrl, sorterClassName, validateLinkedException, xidFactory);
+ return pool;
}
-
- public TransactionManager getTransactionManager()
+
+ public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, javax.jms.MessageListener listener) throws javax.jms.JMSException
{
- return transactionManager;
- }
-
- public ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, MessageListener listener) throws JMSException
- {
- ServerSessionPool pool = new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory, transactionManager);
+
+ ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory);
return pool;
}
}
More information about the jboss-cvs-commits
mailing list