[Jboss-cvs] JBossAS SVN: r55978 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 16 06:40:50 EDT 2006


Author: weston.price at jboss.com
Date: 2006-08-16 06:40:48 -0400 (Wed, 16 Aug 2006)
New Revision: 55978

Modified:
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Log:
[JBAS-3511] JMS asf server improvements. 

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java	2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java	2006-08-16 10:40:48 UTC (rev 55978)
@@ -75,6 +75,8 @@
 
    private XidFactoryMBean xidFactory;
    
+   private long lastUse;
+   
    /**
     * Create a <tt>StdServerSession</tt> .
     *
@@ -425,5 +427,10 @@
       serverSessionPool.recycle(this);
    }
    
+   boolean isTimedOut(long timeout)
+   {
+      return this.lastUse < timeout;
+   }
+   
 }
 

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-08-16 10:40:48 UTC (rev 55978)
@@ -82,7 +82,13 @@
     * True if this is a transacted session.
     */
    private boolean transacted;
-
+   
+   private boolean lazyInitialization;
+   
+   private boolean recycleIdleSessions;
+   
+   private long idleTimeOut;
+   
    /**
     * The destination.
     */
@@ -118,9 +124,71 @@
     * closed.
     */
    private int numServerSessions = 0;
-
+   
+   private ServerSessionPoolParams params;
+   
    private XidFactoryMBean xidFactory;
+   
+   public StdServerSessionPool(final Destination destination, 
+                           final Connection con, 
+                           final boolean transacted, 
+                           final int ack, 
+                           final boolean useLocalTx, 
+                           final MessageListener listener, 
+                           final int minSession,
+                           final int maxSession,
+                           final long keepAlive,
+                           final boolean lazy,
+                           final boolean recycle,
+                           final long idleTimeout,
+                           final XidFactoryMBean xidFactory) throws JMSException
+   {
+      
+      this.destination = destination;
+      this.con = con;
+      this.ack = ack;
+      this.listener = listener;
+      this.transacted = transacted;
+      this.minSize = minSession;
+      this.poolSize = maxSession;
+      this.keepAlive = keepAlive;
+      this.sessionPool = new ArrayList(maxSession);
+      this.useLocalTX = useLocalTx;
+      this.xidFactory = xidFactory;
+      
+      // setup the worker pool
+      executor = new MyPooledExecutor(poolSize);
+      executor.setMinimumPoolSize(minSize);
+      executor.setKeepAliveTime(keepAlive);
+      executor.waitWhenBlocked();
+      executor.setThreadFactory(new DefaultThreadFactory());
+      
+      this.lazyInitialization = lazy;
+      this.recycleIdleSessions = recycle;
+      this.idleTimeOut = idleTimeout;
+      
+      // finish initializing the session
+      if(!lazyInitialization)
+      {
+         create();         
 
+      }else
+      {
+         log.debug("SessionPool is specified as lazy init. No sessions being created.");
+         
+      }
+      
+      if(recycleIdleSessions)
+      {
+         log.debug("Session Pool registering for background recycling at interval" +  idleTimeout);
+         JmsServerSessionReaper.registerSessionPool(this, idleTimeOut);
+         
+      }
+      
+      log.debug("Server Session pool set up");
+      
+      
+   }
    /**
     * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
     *
@@ -201,6 +269,15 @@
                }
                else
                {
+                  //Lazy create here
+                  //Basically we are saying that we are a lazy pool
+                  //And no sessions have been created or are outstanding
+                  if(lazyInitialization && numServerSessions == 0)
+                  {
+                     session = createLazySession();
+                     break;
+                  }
+                  
                   try
                   {
                      sessionPool.wait();
@@ -222,7 +299,8 @@
          log.trace("using server session: " + session);
       return session;
    }
-
+   
+     
    /**
     * Clear the pool, clear out both threads and ServerSessions,
     * connection.stop() should be run before this method.
@@ -306,7 +384,7 @@
    {
       synchronized (sessionPool)
       {
-         if (closing)
+         if (closing || lazyInitialization)
          {
             session.close();
             numServerSessions--;
@@ -327,7 +405,56 @@
    }
 
    // --- Private methods used internally
+   
+   private ServerSession createLazySession() throws JMSException
+   {
+      boolean debug = log.isDebugEnabled();
+      Session ses = null;
+      XASession xaSes = null;
 
+      if (debug)
+         log.debug("initializing lazy session with connection: " + con);
+
+      if (destination instanceof Topic && con instanceof XATopicConnection)
+      {
+         xaSes = ((XATopicConnection)con).createXATopicSession();
+         ses = ((XATopicSession)xaSes).getTopicSession();
+      }
+      else if (destination instanceof Queue && con instanceof XAQueueConnection)
+      {
+         xaSes = ((XAQueueConnection)con).createXAQueueSession();
+         ses = ((XAQueueSession)xaSes).getQueueSession();
+      }
+      else if (destination instanceof Topic && con instanceof TopicConnection)
+      {
+         ses = ((TopicConnection)con).createTopicSession(transacted, ack);
+         log.warn("Using a non-XA TopicConnection.  " +
+               "It will not be able to participate in a Global UOW");
+      }
+      else if (destination instanceof Queue && con instanceof QueueConnection)
+      {
+         ses = ((QueueConnection)con).createQueueSession(transacted, ack);
+         log.warn("Using a non-XA QueueConnection.  " +
+               "It will not be able to participate in a Global UOW");
+      }
+      else
+      {
+         throw new JMSException("Connection was not reconizable: " + con + " for destination " + destination);
+      }
+
+      // create the server session and add it to the pool - it is up to the
+      // server session to set the listener
+      StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
+         listener, useLocalTX, xidFactory);
+
+      numServerSessions++;
+      
+      if (debug)
+         log.debug("" + serverSession);
+      
+      return serverSession;
+   }
+   
    private void create() throws JMSException
    {
       boolean debug = log.isDebugEnabled();
@@ -379,7 +506,33 @@
             log.debug("added server session to the pool: " + serverSession);
       }
    }
-
+   
+   void removeTimedOut()
+   {
+      long timeout = System.currentTimeMillis() - idleTimeOut;
+      
+      while(true)
+      {
+         synchronized(sessionPool)
+         {
+            
+            StdServerSession session = (StdServerSession)sessionPool.get(0);
+            
+            if(session.isTimedOut(timeout))
+            {
+               recycle(session);
+               
+            }
+            
+            else
+               break;
+            
+         }
+         
+      }
+      
+      
+   }
    /**
     * A pooled executor where the minimum pool size
     * threads are kept alive

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java	2006-08-16 09:48:23 UTC (rev 55977)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java	2006-08-16 10:40:48 UTC (rev 55978)
@@ -58,9 +58,24 @@
    {
       return xidFactory;
    }
-
+   
+   public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, boolean lazyInit, boolean recycle, long idleTimeout, javax.jms.MessageListener listener) throws javax.jms.JMSException
+   {
+      ServerSessionPoolParams params = new ServerSessionPoolParams();
+      params.setMinSessionCount(minSession);
+      params.setMaxSessionCount(maxSession);
+      params.setKeepAlive(keepAlive);
+      params.setLazyInitialization(lazyInit);
+      params.setRecycleIdle(recycle);
+      params.setIdleTimeOutMinutes(idleTimeout);
+      
+      ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory);
+      return pool;
+   }
+   
    public javax.jms.ServerSessionPool getServerSessionPool(Destination destination, Connection con, int minSession, int maxSession, long keepAlive, boolean isTransacted, int ack, boolean useLocalTX, javax.jms.MessageListener listener) throws javax.jms.JMSException
    {
+      
       ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(destination, con, isTransacted, ack, useLocalTX, listener, minSession, maxSession, keepAlive, xidFactory);
       return pool;
    }




More information about the jboss-cvs-commits mailing list