[Jboss-cvs] JBossAS SVN: r56205 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 23 18:47:43 EDT 2006
Author: weston.price at jboss.com
Date: 2006-08-23 18:47:42 -0400 (Wed, 23 Aug 2006)
New Revision: 56205
Modified:
branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Log:
[JBAS-3511] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation.
Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-08-23 22:47:31 UTC (rev 56204)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java 2006-08-23 22:47:42 UTC (rev 56205)
@@ -7,6 +7,7 @@
package org.jboss.jms.asf;
import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.ArrayList;
@@ -41,6 +42,7 @@
*
* @author <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
* @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
* @version $Revision$
*/
public class StdServerSessionPool
@@ -118,17 +120,20 @@
* Used to signal when the Pool is being closed down
*/
private boolean closing = false;
-
+
/**
* Used during close down to wait for all server sessions to be returned and
* closed.
*/
private int numServerSessions = 0;
+
+ private XidFactoryMBean xidFactory;
- private ServerSessionPoolParams params;
+ private FIFOSemaphore permits;
- private XidFactoryMBean xidFactory;
+ private SessionPoolStatisticsCollector counter = new SessionPoolStatisticsCollector();
+
public StdServerSessionPool(final Destination destination,
final Connection con,
final boolean transacted,
@@ -163,6 +168,8 @@
executor.waitWhenBlocked();
executor.setThreadFactory(new DefaultThreadFactory());
+ permits = new FIFOSemaphore(maxSession);
+
this.lazyInitialization = lazy;
this.recycleIdleSessions = recycle;
this.idleTimeOut = idleTimeout;
@@ -254,6 +261,9 @@
try
{
+ //Note, we will block until we get a new one to keep old behavior
+ permits.acquire();
+
while (true)
{
synchronized (sessionPool)
@@ -265,26 +275,15 @@
else if (sessionPool.size() > 0)
{
session = (ServerSession)sessionPool.remove(0);
+ ((StdServerSession)session).setHasPermit(true);
break;
}
else
{
- //Lazy create here
- //Basically we are saying that we are a lazy pool
- //And no sessions have been created or are outstanding
- if(lazyInitialization && numServerSessions == 0)
- {
- session = createLazySession();
- break;
- }
-
- try
- {
- sessionPool.wait();
- }
- catch (InterruptedException ignore)
- {
- }
+ log.trace("Creating lazy session.");
+ session = createLazySession();
+ ((StdServerSession)session).setHasPermit(true);
+ break;
}
}
}
@@ -333,6 +332,9 @@
sessionPool.clear();
sessionPool.notifyAll();
}
+
+ //Unregister with the Reaper
+ JmsServerSessionReaper.unregisterSessionPool(this);
//Must be outside synchronized block because of recycle method.
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
@@ -352,7 +354,7 @@
}
}
}
-
+
/**
* Get the executor we are using.
*
@@ -362,7 +364,16 @@
{
return executor;
}
+
+ public int getTimedOut()
+ {
+ return counter.getTimedOutCount();
+ }
+ public int getCurrentSessionCount()
+ {
+ return numServerSessions;
+ }
// --- Protected messages for StdServerSession to use
/**
@@ -377,19 +388,52 @@
void recycleIdleSession(StdServerSession session)
{
+
+ if (session.getHasPermit())
+ {
+ session.setHasPermit(false);
+ permits.release();
+ }
+
synchronized (sessionPool)
{
session.close();
+ sessionPool.remove(session);
numServerSessions--;
- if (numServerSessions == 0)
- {
- //notify clear thread.
- sessionPool.notifyAll();
- }
+ counter.timedOut();
}
+
+ //Make sure we don't allow the pool to fall below the minimum
+ if(!closing && minSize > 0)
+ JmsSessionPoolFiller.fillPool(this);
}
+
+ public void fillToMin() throws JMSException
+ {
+
+ try
+ {
+ permits.acquire();
+
+ synchronized (sessionPool)
+ {
+ if(numServerSessions < minSize)
+ create(minSize);
+
+ }
+
+ }catch(InterruptedException e)
+ {
+ log.debug("Interrupted exception in attemping to fill JMS Session pool to minimum allowance.", e);
+ }
+ finally
+ {
+ permits.release();
+ }
+
+ }
/**
* Recycle a server session.
*
@@ -397,12 +441,20 @@
*/
void recycle(StdServerSession session)
{
- synchronized (sessionPool)
+
+ if(session.getHasPermit())
+ {
+ session.setHasPermit(false);
+ permits.release();
+ }
+
+ synchronized (sessionPool)
{
if (closing)
{
session.close();
numServerSessions--;
+
if (numServerSessions == 0)
{
//notify clear thread.
@@ -411,8 +463,10 @@
}
else
{
+ session.setLastUsed(System.currentTimeMillis());
sessionPool.add(session);
sessionPool.notifyAll();
+
if( log.isTraceEnabled() )
log.trace("recycled server session: " + session);
}
@@ -461,7 +515,6 @@
// server session to set the listener
StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
listener, useLocalTX, xidFactory);
-
numServerSessions++;
if (debug)
@@ -524,28 +577,37 @@
void removeTimedOut()
{
+
long timeout = System.currentTimeMillis() - idleTimeOut;
while(true)
{
- synchronized(sessionPool)
+ synchronized (sessionPool)
{
- StdServerSession session = (StdServerSession)sessionPool.get(0);
-
- if(session.isTimedOut(timeout))
+ if(sessionPool.size() > 0)
{
- recycleIdleSession(session); }
+ StdServerSession session = (StdServerSession)sessionPool.get(0);
+
+ if (session.isTimedOut(timeout))
+ {
+ recycleIdleSession(session);
+ }
+
+ else
+ {
+ break;
+ }
+ }
else
- break;
-
+ break;
}
}
+ }
-
- }
+
/**
* A pooled executor where the minimum pool size
* threads are kept alive
@@ -601,4 +663,22 @@
return thread;
}
}
+
+ private static class SessionPoolStatisticsCollector
+ {
+ private int timedOut;
+
+ synchronized void timedOut()
+ {
+ timedOut++;
+
+ }
+
+ private int getTimedOutCount()
+ {
+ return timedOut;
+ }
+
+ }
+
}
More information about the jboss-cvs-commits
mailing list