[Jboss-cvs] JBossAS SVN: r56205 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 23 18:47:43 EDT 2006


Author: weston.price at jboss.com
Date: 2006-08-23 18:47:42 -0400 (Wed, 23 Aug 2006)
New Revision: 56205

Modified:
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Log:
[JBAS-3511] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation.

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-08-23 22:47:31 UTC (rev 56204)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-08-23 22:47:42 UTC (rev 56205)
@@ -7,6 +7,7 @@
 package org.jboss.jms.asf;
 import EDU.oswego.cs.dl.util.concurrent.Executor;
 
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
 import java.util.ArrayList;
@@ -41,6 +42,7 @@
  *
  * @author    <a href="mailto:peter.antman at tim.se">Peter Antman</a> .
  * @author    <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a> .
+ * @author    <a href="mailto:weston.price at jboss.com>Weston Price</a>
  * @version   $Revision$
  */
 public class StdServerSessionPool
@@ -118,17 +120,20 @@
     * Used to signal when the Pool is being closed down
     */
    private boolean closing = false;
-
+   
    /**
     * Used during close down to wait for all server sessions to be returned and
     * closed.
     */
    private int numServerSessions = 0;
+    
+   private XidFactoryMBean xidFactory;
    
-   private ServerSessionPoolParams params;
+   private FIFOSemaphore permits;
    
-   private XidFactoryMBean xidFactory;
    
+   private SessionPoolStatisticsCollector counter = new SessionPoolStatisticsCollector();
+   
    public StdServerSessionPool(final Destination destination, 
                            final Connection con, 
                            final boolean transacted, 
@@ -163,6 +168,8 @@
       executor.waitWhenBlocked();
       executor.setThreadFactory(new DefaultThreadFactory());
       
+      permits = new FIFOSemaphore(maxSession);
+      
       this.lazyInitialization = lazy;
       this.recycleIdleSessions = recycle;
       this.idleTimeOut = idleTimeout;
@@ -254,6 +261,9 @@
 
       try
       {
+    	 //Note, we will block until we get a new one to keep old behavior
+    	 permits.acquire();
+    	 
          while (true)
          {
             synchronized (sessionPool)
@@ -265,26 +275,15 @@
                else if (sessionPool.size() > 0)
                {
                   session = (ServerSession)sessionPool.remove(0);
+                  ((StdServerSession)session).setHasPermit(true);
                   break;
                }
                else
                {
-                  //Lazy create here
-                  //Basically we are saying that we are a lazy pool
-                  //And no sessions have been created or are outstanding
-                  if(lazyInitialization && numServerSessions == 0)
-                  {
-                     session = createLazySession();
-                     break;
-                  }
-                  
-                  try
-                  {
-                     sessionPool.wait();
-                  }
-                  catch (InterruptedException ignore)
-                  {
-                  }
+            	  log.trace("Creating lazy session.");
+            	  session = createLazySession();
+            	  ((StdServerSession)session).setHasPermit(true);
+            	  break;
                }
             }
          }
@@ -333,6 +332,9 @@
          sessionPool.clear();
          sessionPool.notifyAll();
       }
+      
+      //Unregister with the Reaper
+      JmsServerSessionReaper.unregisterSessionPool(this);
 
       //Must be outside synchronized block because of recycle method.
       executor.shutdownAfterProcessingCurrentlyQueuedTasks();
@@ -352,7 +354,7 @@
          }
       }
    }
-
+   
    /**
     * Get the executor we are using.
     *
@@ -362,7 +364,16 @@
    {
       return executor;
    }
+   
+   public int getTimedOut()
+   {
+      return counter.getTimedOutCount();
+   }
 
+   public int getCurrentSessionCount()
+   {
+      return numServerSessions;
+   }
    // --- Protected messages for StdServerSession to use
 
    /**
@@ -377,19 +388,52 @@
    
    void recycleIdleSession(StdServerSession session)
    {
+      
+	  if (session.getHasPermit()) 
+	  {
+		  session.setHasPermit(false);
+		  permits.release();
+	  }
+	  
       synchronized (sessionPool)
       {
          session.close();
+         sessionPool.remove(session);
          numServerSessions--;
-         if (numServerSessions == 0)
-         {
-            //notify clear thread.
-            sessionPool.notifyAll();
-         }
+         counter.timedOut();
          
       }
+
+      //Make sure we don't allow the pool to fall below the minimum
+      if(!closing && minSize > 0)
+         JmsSessionPoolFiller.fillPool(this);
    }
+   
+   public void fillToMin() throws JMSException
+   {
+	  
+	  try
+	  {
+	      permits.acquire();
+		  
+	      synchronized (sessionPool)
+	      {
+	    	  if(numServerSessions < minSize)
+	    		  create(minSize);
+	      
+	      }
+		  
+	  }catch(InterruptedException e)
+	  {
+		  log.debug("Interrupted exception in attemping to fill JMS Session pool to minimum allowance.", e);
+	  }
 
+      finally
+	  {
+		 permits.release();
+	  }
+   
+   }
    /**
     * Recycle a server session.
     *
@@ -397,12 +441,20 @@
     */
    void recycle(StdServerSession session)
    {
-      synchronized (sessionPool)
+	  
+	  if(session.getHasPermit())
+	  {
+		  session.setHasPermit(false);
+		  permits.release();
+	  }
+
+	  synchronized (sessionPool)
       {
          if (closing)
          {
             session.close();
             numServerSessions--;
+      
             if (numServerSessions == 0)
             {
                //notify clear thread.
@@ -411,8 +463,10 @@
          }
          else
          {
+            session.setLastUsed(System.currentTimeMillis());
             sessionPool.add(session);
             sessionPool.notifyAll();
+          
             if( log.isTraceEnabled() )
                log.trace("recycled server session: " + session);
          }
@@ -461,7 +515,6 @@
       // server session to set the listener
       StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
          listener, useLocalTX, xidFactory);
-
       numServerSessions++;
       
       if (debug)
@@ -524,28 +577,37 @@
    
    void removeTimedOut()
    {
+   
       long timeout = System.currentTimeMillis() - idleTimeOut;
       
       while(true)
       {
-         synchronized(sessionPool)
+         synchronized (sessionPool)
          {
             
-            StdServerSession session = (StdServerSession)sessionPool.get(0);
-            
-            if(session.isTimedOut(timeout))
+            if(sessionPool.size() > 0)
             {
-               recycleIdleSession(session);            }
+               StdServerSession session = (StdServerSession)sessionPool.get(0);
+               
+               if (session.isTimedOut(timeout))
+               {
+                  recycleIdleSession(session);
+               }
+             
+               else
+               {
+                  break;
+               }
             
+            }
             else
-               break;
-            
+            	break;
          }
          
       }
+   }        
       
-      
-   }
+         
    /**
     * A pooled executor where the minimum pool size
     * threads are kept alive
@@ -601,4 +663,22 @@
          return thread;
       }
    }
+
+   private static class SessionPoolStatisticsCollector
+   {
+      private int timedOut;
+      
+      synchronized void timedOut()
+      {
+         timedOut++;
+         
+      }
+      
+      private int getTimedOutCount()
+      {
+         return timedOut;
+      }
+      
+    }
+
 }




More information about the jboss-cvs-commits mailing list