[Jboss-cvs] JBossAS SVN: r56642 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 8 06:15:27 EDT 2006


Author: weston.price at jboss.com
Date: 2006-09-08 06:15:25 -0400 (Fri, 08 Sep 2006)
New Revision: 56642

Modified:
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Log:
[JBAS-3511] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation.

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java	2006-09-08 08:28:29 UTC (rev 56641)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSession.java	2006-09-08 10:15:25 UTC (rev 56642)
@@ -106,8 +106,10 @@
       this.session = session;
       this.xaSession = xaSession;
       this.delegateListener = delegateListener;
+      
       if( xaSession == null )
          useLocalTX = false;
+      
       this.useLocalTX = useLocalTX;
       this.xidFactory = xidFactory;
 
@@ -115,10 +117,12 @@
          log.debug("initializing (pool, session, xaSession, useLocalTX): " +
             pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
       
-      // Set out self as message listener
       if (xaSession != null)
+         
          xaSession.setMessageListener(this);
+      
       else
+        
          session.setMessageListener(this);
       
       InitialContext ctx = null;
@@ -183,24 +187,39 @@
    public void run()
    {
       boolean trace = log.isTraceEnabled();
+      
       if (trace)
-         log.trace("running...");
+         log.trace("Running StdServerSession" + this);
+      
+      TransactionDemarcationStrategy td = null;
+      
+      if (StdServerSessionPool.USE_OLD == false)
+      {
+         td = createTransactionDemarcation();
+         
+         if (td == null)
+            return;
+      }
+
       try
       {
-         if (xaSession != null)
+         if (StdServerSessionPool.USE_OLD && xaSession != null)   
             xaSession.run();
+      
          else
             session.run();
-      }
-      finally
+
+      }catch(Throwable t)
       {
-         if (trace)
-            log.trace("recycling...");
-        
+         if (td != null)
+            td.error();
+         
+      }finally
+      {
+         if(td != null)
+            td.end();
+                 
          recycle();
-
-         if (trace)
-            log.trace("finished run");
       }
    }
 
@@ -219,152 +238,40 @@
    public void onMessage(Message msg)
    {      
       boolean trace = log.isTraceEnabled();
+      
       if( trace )
          log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
          ", " + session + ", " + xaSession + ", " + useLocalTX);
-
-      // Used if run with useLocalTX if true
-      Xid localXid = null;
-      boolean localRollbackFlag=false;
-      // Used if run with useLocalTX if false
-      Transaction trans = null;
+      
+      TransactionDemarcationStrategy td = null;
+      
+      if (StdServerSessionPool.USE_OLD)
+      {
+         td = createTransactionDemarcation();
+         if (td == null)
+            return;
+      }
+      
       try
       {
+         delegateListener.onMessage(msg);
          
-         if (useLocalTX)
-         {
-            // Use JBossMQ One Phase Commit to commit the TX
-            localXid = xidFactory.newXid();//new XidImpl();
-            XAResource res = xaSession.getXAResource();
-            res.start(localXid, XAResource.TMNOFLAGS);
-            
-            if( trace )
-               log.trace("Using optimized 1p commit to control TX.");
-         }
-         else
-         {
-            
-            // Use the TM to control the TX
-            tm.begin();
-            trans = tm.getTransaction();
-            
-            if (xaSession != null)
-            {
-               XAResource res = xaSession.getXAResource();
-               if (!trans.enlistResource(res))
-	       {
-		  throw new JMSException("could not enlist resource");
-	       }
-               if( trace )
-                  log.trace("XAResource '" + res + "' enlisted.");
-            }
-         }
-         //currentTransactionId = connection.spyXAResourceManager.startTx();
-         
-         // run the session
-         //session.run();
-         // Call delegate listener
-         delegateListener.onMessage(msg);
-      }
-      catch (Exception e)
+      }catch(Throwable t)
       {
-         log.error("session failed to run; setting rollback only", e);
+         log.error("Unexpected error invoking on message " + msg, t);
          
-         if (useLocalTX)
-         {
-            // Use JBossMQ One Phase Commit to commit the TX
-            localRollbackFlag = true;
-         }
-         else
-         {
-            // Mark for tollback TX via TM
-            try
-            {
-               // The transaction will be rolledback in the finally
-               if( trace )
-                  log.trace("Using TM to mark TX for rollback.");
-               trans.setRollbackOnly();
-            }
-            catch (Exception x)
-            {
-               log.error("failed to set rollback only", x);
-            }
-         }
+         if(td != null)
+            td.error();
          
       }
       finally
       {
-         try
-         {
-            if (useLocalTX)
-            {
-               if( localRollbackFlag == true  )
-               {
-                  if( trace )
-                     log.trace("Using optimized 1p commit to rollback TX.");
-                  
-                  XAResource res = xaSession.getXAResource();
-                  res.end(localXid, XAResource.TMSUCCESS);
-                  res.rollback(localXid);
-                  
-               }
-               else
-               {
-                  if( trace )
-                     log.trace("Using optimized 1p commit to commit TX.");
-                  
-                  XAResource res = xaSession.getXAResource();
-                  res.end(localXid, XAResource.TMSUCCESS);
-                  res.commit(localXid, true);
-               }
-            }
-            else
-            {
-               // Use the TM to commit the Tx (assert the correct association) 
-               Transaction currentTx = tm.getTransaction();
-               if (trans.equals(currentTx) == false)
-                  throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
-
-               // Marked rollback
-               if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
-               {
-                  if( trace )
-                     log.trace("Rolling back JMS transaction");
-                  // actually roll it back
-                  tm.rollback();
-                  
-                  // NO XASession? then manually rollback.
-                  // This is not so good but
-                  // it's the best we can do if we have no XASession.
-                  if (xaSession == null && serverSessionPool.isTransacted())
-                  {
-                     session.rollback();
-                  }
-               }
-               else if (trans.getStatus() == Status.STATUS_ACTIVE)
-               {
-                  // Commit tx
-                  // This will happen if
-                  // a) everything goes well
-                  // b) app. exception was thrown
-                  if( trace )
-                     log.trace("Commiting the JMS transaction");
-                  tm.commit();
-
-                  // NO XASession? then manually commit.  This is not so good but
-                  // it's the best we can do if we have no XASession.
-                  if (xaSession == null && serverSessionPool.isTransacted())
-                  {
-                     session.commit();
-                  }
-               }
-            }            
-         }
-         catch (Exception e)
-         {
-            log.error("failed to commit/rollback", e);
-         }
+         if(td != null)
+            td.end();
       }
+    
+      
+     
       if( trace )
          log.trace("onMessage done");
    }
@@ -433,7 +340,7 @@
     */
    void recycle()
    {
-      serverSessionPool.recycle(this);
+      serverSessionPool.recycle(this, false);
    }
    
    boolean isTimedOut(long timeout)
@@ -455,5 +362,219 @@
    {
 	   return this.hasPermit;
    }
+
+   private TransactionDemarcationStrategy createTransactionDemarcation()
+   {
+      return new DemarcationStrategyFactory().getStrategy();      
+   }
+   
+   private interface TransactionDemarcationStrategy
+   {
+      void error();
+      void end();
+      
+   }
+   
+   private class DemarcationStrategyFactory
+   {
+      
+      public DemarcationStrategyFactory()
+      {
+      }
+
+      TransactionDemarcationStrategy getStrategy()
+      {
+         
+         if(!useLocalTX)
+         {
+            try
+            {
+               log.trace("Creating XATransactionDemarcationStrategy");
+               return new XATransactionDemarcationStrategy();
+            }
+            catch (Throwable t)
+            {
+               log.error(this + " error creating transaction demarcation ", t);
+               return null;
+            }         
+          
+         }else
+         {
+            log.trace("Creating LocalDemarcationStrategy");
+            return new LocalDemarcationStrategy();
+            
+         }
+         
+      }
+   
+   }
+   private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      public void end()
+      {
+         if(serverSessionPool.isTransacted())
+         {
+            if(session != null)
+            {
+               try
+               {
+                  session.commit();
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to commit session transaction", e);
+               }
+            }
+         }
+      }
+      
+      public void error()
+      {
+         if(serverSessionPool.isTransacted())
+         {
+            if(session != null)
+               
+               try
+               {
+                  session.rollback();
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to rollback session transaction", e);
+               }
+            
+         }
+      }
+   
+   }
+
+   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+     
+      boolean trace = log.isTraceEnabled();
+      
+      Transaction trans = null;
+      
+      public XATransactionDemarcationStrategy() throws Throwable
+      {
+            
+            tm.begin();
+
+            try
+            {
+               trans = tm.getTransaction();
+
+               if (trace)
+                  log.trace(StdServerSession.this + " using tx=" + trans);
+
+               if (xaSession != null)
+               {
+                  XAResource res = xaSession.getXAResource();
+
+                  if (!trans.enlistResource(res))
+                  {
+                     throw new JMSException("could not enlist resource");
+                  }
+                  if (trace)
+                     log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
+               }
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  tm.rollback();
+               }
+               catch (Throwable ignored)
+               {
+                  log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
+               }
+               throw t;
+            }
+
+       }
+         
+      
+      public void error()
+      {
+         try
+         {
+
+            if (trace)
+               log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+            
+            trans.setRollbackOnly();
+         }
+         catch (Throwable t)
+         {
+            log.error(StdServerSession.this + " failed to set rollback only", t);
+         }
+
+      }
+      
+      public void end()
+      {
+         try
+         {
+
+            // Use the TM to commit the Tx (assert the correct association) 
+            Transaction currentTx = tm.getTransaction();
+            
+            if (trans.equals(currentTx) == false)
+               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+            // Marked rollback
+            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+            {
+               if (trace)
+                  log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
+               // actually roll it back
+               tm.rollback();
+
+               // NO XASession? then manually rollback.
+               // This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && serverSessionPool.isTransacted())
+               {
+                  session.rollback();
+               }
+            }
+            else if (trans.getStatus() == Status.STATUS_ACTIVE)
+            {
+               // Commit tx
+               // This will happen if
+               // a) everything goes well
+               // b) app. exception was thrown
+               if (trace)
+                  log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
+               tm.commit();
+
+               // NO XASession? then manually commit.  This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && serverSessionPool.isTransacted())
+               {
+                  session.commit();
+               }
+            }
+            else
+            {
+               tm.suspend();
+             
+               if (xaSession == null && serverSessionPool.isTransacted())
+               {
+                  session.commit();
+               }
+            }
+            
+
+         }
+         catch (Throwable t)
+         {
+            log.error(StdServerSession.this + " failed to commit/rollback", t);
+         }
+
+      }
+
+   }
 }
 

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-09-08 08:28:29 UTC (rev 56641)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/jms/asf/StdServerSessionPool.java	2006-09-08 10:15:25 UTC (rev 56642)
@@ -5,14 +5,10 @@
  * See terms of license at gnu.org.
  */
 package org.jboss.jms.asf;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-
-import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Iterator;
-
 import java.util.List;
 
 import javax.jms.Connection;
@@ -35,6 +31,11 @@
 import org.jboss.logging.Logger;
 import org.jboss.tm.XidFactoryMBean;
 
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+
 /**
  * Implementation of ServerSessionPool. <p>
  *
@@ -48,6 +49,18 @@
 public class StdServerSessionPool
        implements ServerSessionPool
 {
+   public static final boolean USE_OLD;
+   
+   static
+   {
+      USE_OLD = ((Boolean) AccessController.doPrivileged(new PrivilegedAction()
+      {
+         public Object run()
+         {
+            return new Boolean(System.getProperty("org.jboss.jms.asf.useold", "false"));
+         }
+      })).booleanValue();
+   }
    /**
     * The thread group which session workers will run.
     */
@@ -85,10 +98,13 @@
     */
    private boolean transacted;
    
+   /** The lazyInitialization */
    private boolean lazyInitialization;
    
+   /** The recycleIdleSessions */
    private boolean recycleIdleSessions;
    
+   /** The idleTimeOut */
    private long idleTimeOut;
    
    /**
@@ -177,11 +193,12 @@
       // finish initializing the session
       if(!lazyInitialization)
       {
+         log.debug("StdServerSessionPool is configured for non-lazy inititalization. " + poolSize + " sessions being created for use." );
          create(this.poolSize);         
 
       }else
       {
-         log.debug("SessionPool is specified as lazy init. No sessions being created.");
+         log.debug("StdServerSessionPool is specified as lazy init. " + poolSize + " session(s) being created");
          create(this.minSize);
       }
       
@@ -276,6 +293,7 @@
                {
                   session = (ServerSession)sessionPool.remove(0);
                   ((StdServerSession)session).setHasPermit(true);
+                  counter.inUse();
                   break;
                }
                else
@@ -283,7 +301,8 @@
             	  log.trace("Creating lazy session.");
             	  session = createLazySession();
             	  ((StdServerSession)session).setHasPermit(true);
-            	  break;
+                  counter.inUse();
+                  break;
                }
             }
          }
@@ -296,6 +315,9 @@
       // assert session != null
       if( log.isTraceEnabled() )
          log.trace("using server session: " + session);
+      
+      int size = (int)(poolSize - permits.permits());
+      counter.maxSessionInUse(size);
       return session;
    }
    
@@ -374,6 +396,16 @@
    {
       return numServerSessions;
    }
+   
+   public int getCurrentSessionInUseCount()
+   {
+      return counter.getInUseCount();
+   }
+   
+   public int getSessionMaxInUseCount()
+   {
+      return counter.getMaxSessionInUse();
+   }
    // --- Protected messages for StdServerSession to use
 
    /**
@@ -439,7 +471,7 @@
     *
     * @param session  Description of Parameter
     */
-   void recycle(StdServerSession session)
+   void recycle(StdServerSession session, boolean isTimedOut)
    {
 	  
 	  if(session.getHasPermit())
@@ -450,16 +482,20 @@
 
 	  synchronized (sessionPool)
       {
-         if (closing)
+         if(!isTimedOut)
+            counter.decInUse();
+         
+         if (closing || isTimedOut)
          {
             session.close();
             numServerSessions--;
-      
+            
             if (numServerSessions == 0)
             {
                //notify clear thread.
                sessionPool.notifyAll();
             }
+         
          }
          else
          {
@@ -471,6 +507,9 @@
                log.trace("recycled server session: " + session);
          }
       }
+	  
+     
+   
    }
 
    // --- Private methods used internally
@@ -579,6 +618,8 @@
    {
    
       long timeout = System.currentTimeMillis() - idleTimeOut;
+      boolean trace = log.isTraceEnabled();
+      List timedOut = new ArrayList();
       
       while(true)
       {
@@ -591,7 +632,15 @@
                
                if (session.isTimedOut(timeout))
                {
-                  recycleIdleSession(session);
+                  sessionPool.remove(0);                  
+                  timedOut.add(session);
+                
+                  if(trace)
+                  {
+                     log.trace("Recycling timed out session" + session);
+                     
+                  }
+                  
                }
              
                else
@@ -600,11 +649,26 @@
                }
             
             }
+            
             else
             	break;
          }
          
       }
+      
+      counter.timedOut(timedOut.size());
+      
+      for(Iterator iter = timedOut.iterator(); iter.hasNext();)
+      {
+         StdServerSession session = (StdServerSession)iter.next();
+         log.trace("Recycling timed out StdServerSession" + session);
+         recycle(session, true);
+         
+      }
+      
+      if(!closing && minSize > 0)
+         JmsSessionPoolFiller.fillPool(this);
+   
    }        
       
          
@@ -663,22 +727,56 @@
          return thread;
       }
    }
-
+   
+   //Simple class to record SessionStatistics
    private static class SessionPoolStatisticsCollector
    {
       private int timedOut;
+      private int inUse;
+      private int maxInUse;
       
+      synchronized void decInUse()
+      {
+         --inUse;
+      }
+      
+      synchronized void inUse()
+      {
+         ++inUse;
+      }
+      
+      synchronized void timedOut(int count)
+      {
+         timedOut += count;
+      }
       synchronized void timedOut()
       {
-         timedOut++;
+         ++timedOut;
          
+      }      
+            
+      synchronized void maxSessionInUse(int count)
+      {
+         if(count > maxInUse)
+            maxInUse = count;
       }
       
+      private int getMaxSessionInUse()
+      {
+         return maxInUse;
+      }
+
       private int getTimedOutCount()
       {
          return timedOut;
       }
       
-    }
+      private int getInUseCount()
+      {
+         return inUse;
+      }
 
+   
+   }
+
 }




More information about the jboss-cvs-commits mailing list