[Jboss-cvs] JBossAS SVN: r55978 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 16 06:40:50 EDT 2006
Author: weston.price at jboss.com
Date: 2006-08-16 06:40:48 -0400 (Wed, 16 Aug 2006)
New Revision: 55978
Modified:
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Log:
[JBAS-3511] JMS asf server improvements.
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java 2006-08-16 10:40:48 UTC (rev 55978)
@@ -75,6 +75,8 @@
private XidFactoryMBean xidFactory;
+ private long lastUse;
+
/**
* Create a <tt>StdServerSession</tt> .
*
@@ -425,5 +427,10 @@
serverSessionPool.recycle(this);
}
+ boolean isTimedOut(long timeout)
+ {
+ return this.lastUse < timeout;
+ }
+
}
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-08-16 10:40:48 UTC (rev 55978)
@@ -82,7 +82,13 @@
* True if this is a transacted session.
*/
private boolean transacted;
-
+
+ private boolean lazyInitialization;
+
+ private boolean recycleIdleSessions;
+
+ private long idleTimeOut;
+
/**
* The destination.
*/
@@ -118,9 +124,71 @@
* closed.
*/
private int numServerSessions = 0;
-
+
+ private ServerSessionPoolParams params;
+
private XidFactoryMBean xidFactory;
+
+ public StdServerSessionPool(final Destination destination,
+ final Connection con,
+ final boolean transacted,
+ final int ack,
+ final boolean useLocalTx,
+ final MessageListener listener,
+ final int minSession,
+ final int maxSession,
+ final long keepAlive,
+ final boolean lazy,
+ final boolean recycle,
+ final long idleTimeout,
+ final XidFactoryMBean xidFactory) throws JMSException
+ {
+
+ this.destination = destination;
+ this.con = con;
+ this.ack = ack;
+ this.listener = listener;
+ this.transacted = transacted;
+ this.minSize = minSession;
+ this.poolSize = maxSession;
+ this.keepAlive = keepAlive;
+ this.sessionPool = new ArrayList(maxSession);
+ this.useLocalTX = useLocalTx;
+ this.xidFactory = xidFactory;
+
+ // setup the worker pool
+ executor = new MyPooledExecutor(poolSize);
+ executor.setMinimumPoolSize(minSize);
+ executor.setKeepAliveTime(keepAlive);
+ executor.waitWhenBlocked();
+ executor.setThreadFactory(new DefaultThreadFactory());
+
+ this.lazyInitialization = lazy;
+ this.recycleIdleSessions = recycle;
+ this.idleTimeOut = idleTimeout;
+
+ // finish initializing the session
+ if(!lazyInitialization)
+ {
+ create();
+ }else
+ {
+ log.debug("SessionPool is specified as lazy init. No sessions being created.");
+
+ }
+
+ if(recycleIdleSessions)
+ {
+ log.debug("Session Pool registering for background recycling at interval" + idleTimeout);
+ JmsServerSessionReaper.registerSessionPool(this, idleTimeOut);
+
+ }
+
+ log.debug("Server Session pool set up");
+
+
+ }
/**
* Construct a <tt>StdServerSessionPool</tt> using the default pool size.
*
@@ -201,6 +269,15 @@
}
else
{
+ //Lazy create here
+ //Basically we are saying that we are a lazy pool
+ //And no sessions have been created or are outstanding
+ if(lazyInitialization && numServerSessions == 0)
+ {
+ session = createLazySession();
+ break;
+ }
+
try
{
sessionPool.wait();
@@ -222,7 +299,8 @@
log.trace("using server session: " + session);
return session;
}
-
+
+
/**
* Clear the pool, clear out both threads and ServerSessions,
* connection.stop() should be run before this method.
@@ -306,7 +384,7 @@
{
synchronized (sessionPool)
{
- if (closing)
+ if (closing || lazyInitialization)
{
session.close();
numServerSessions--;
@@ -327,7 +405,56 @@
}
// --- Private methods used internally
+
+ private ServerSession createLazySession() throws JMSException
+ {
+ boolean debug = log.isDebugEnabled();
+ Session ses = null;
+ XASession xaSes = null;
+ if (debug)
+ log.debug("initializing lazy session with connection: " + con);
+
+ if (destination instanceof Topic && con instanceof XATopicConnection)
+ {
+ xaSes = ((XATopicConnection)con).createXATopicSession();
+ ses = ((XATopicSession)xaSes).getTopicSession();
+ }
+ else if (destination instanceof Queue && con instanceof XAQueueConnection)
+ {
+ xaSes = ((XAQueueConnection)con).createXAQueueSession();
+ ses = ((XAQueueSession)xaSes).getQueueSession();
+ }
+ else if (destination instanceof Topic && con instanceof TopicConnection)
+ {
+ ses = ((TopicConnection)con).createTopicSession(transacted, ack);
+ log.warn("Using a non-XA TopicConnection. " +
+ "It will not be able to participate in a Global UOW");
+ }
+ else if (destination instanceof Queue && con instanceof QueueConnection)
+ {
+ ses = ((QueueConnection)con).createQueueSession(transacted, ack);
+ log.warn("Using a non-XA QueueConnection. " +
+ "It will not be able to participate in a Global UOW");
+ }
+ else
+ {
+ throw new JMSException("Connection was not reconizable: " + con + " for destination " + destination);
+ }
+
+ // create the server session and add it to the pool - it is up to the
+ // server session to set the listener
+ StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
+ listener, useLocalTX, xidFactory);
+
+ numServerSessions++;
+
+ if (debug)
+ log.debug("" + serverSession);
+
+ return serverSession;
+ }
+
private void create() throws JMSException
{
boolean debug = log.isDebugEnabled();
@@ -379,7 +506,33 @@
log.debug("added server session to the pool: " + serverSession);
}
}
-
+
+ void removeTimedOut()
+ {
+ long timeout = System.currentTimeMillis() - idleTimeOut;
+
+ while(true)
+ {
+ synchronized(sessionPool)
+ {
+
+ StdServerSession session = (StdServerSession)sessionPool.get(0);
+
+ if(session.isTimedOut(timeout))
+ {
+ recycle(session);
+
+ }
+
+ else
+ break;
+
+ }
+
+ }
+
+
+ }
/**
* A pooled executor where the minimum pool size
* threads are kept alive
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java 2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java 2006-08-16 10:40:48 UTC (rev 55978)
@@ -58,9 +58,24 @@
{
return xidFactory;
}
-
+
+ public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, boolean lazyInit, boolean recycle, long idleTimeout, javax.jms.MessageListener listener) throws javax.jms.JMSException
+ {
+ ServerSessionPoolParams params = new ServerSessionPoolParams();
+ params.setMinSessionCount(minSession);
+ params.setMaxSessionCount(maxSession);
+ params.setKeepAlive(keepAlive);
+ params.setLazyInitialization(lazyInit);
+ params.setRecycleIdle(recycle);
+ params.setIdleTimeOutMinutes(idleTimeout);
+
+ ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory);
+ return pool;
+ }
+
public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, javax.jms.MessageListener listener) throws javax.jms.JMSException
{
+
ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory);
return pool;
}
More information about the jboss-cvs-commits
mailing list