[Jboss-cvs] JBossAS SVN: r56208 - branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 23 18:48:34 EDT 2006


Author: weston.price at jboss.com
Date: 2006-08-23 18:48:33 -0400 (Wed, 23 Aug 2006)
New Revision: 56208

Modified:
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
   branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java
Log:
[JBAS-3511] Improvements for ASF integration, pool management, idle session collection
as and dynamic allocation.

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java	2006-08-23 22:48:17 UTC (rev 56207)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java	2006-08-23 22:48:33 UTC (rev 56208)
@@ -60,6 +60,8 @@
  * @author <a href="mailto:sebastien.alborini at m4x.org">Sebastien Alborini</a>
  * @author <a href="mailto:marc.fleury at telkel.com">Marc Fleury</a>
  * @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
+ * 
  * @version <tt>$Revision$</tt>
  * @jmx:mbean extends="org.jboss.system.ServiceMBean"
  */
@@ -142,10 +144,10 @@
    protected int maxPoolSize = 15;
    
    /** The lazyInitialization */
-   protected boolean lazyInitialization;
+   protected boolean lazyInitialization = false;
    
    /** The recycleIdleSessions */
-   protected boolean recycleIdleSessions;
+   protected boolean recycleIdleSessions = false;
    
    /** The idleTimeoutMinutes */
    protected long idleTimeoutMinutes = 30 * 1000;
@@ -314,22 +316,62 @@
       this.maxMessagesNr = maxMessages;
    }
    
-   public long getIdleTimeoutMinutes()
+   public long getIdleTimeOutMinutes()
    {
       return this.idleTimeoutMinutes;
       
    }
    
-   public boolean isLazyIntialization()
+   public boolean getLazyInitialization()
    {
       return this.lazyInitialization;
    }
    
-   public boolean isRecycleIdleSesssions()
+   public void setLazyInitialization(boolean lazy)
    {
+      this.lazyInitialization = lazy;
+      
+   }
+   
+   public boolean getRecycleIdleSessions()
+   {
       return this.recycleIdleSessions;
    }
    
+   public void setRecycleIdleSessions(boolean recycle)
+   {
+      this.recycleIdleSessions = recycle;
+      
+   }
+   
+   public void setIdleTimeOutMinutes(long idleTimeout)
+   {
+      this.idleTimeoutMinutes = idleTimeout;
+   }
+   public int getSessionTimedOutCount()
+   {
+      int timedOut = 0;
+      if(pool != null)
+      {
+         StdServerSessionPool sp = (StdServerSessionPool)pool;
+         timedOut = sp.getTimedOut();
+      }
+      
+      return timedOut;
+   }
+   
+   public int getCurrentSessionCount() 
+   {
+      int currentCount = 0;
+      if(pool != null)
+      {
+         StdServerSessionPool sp = (StdServerSessionPool)pool;
+         currentCount = sp.getCurrentSessionCount();
+      }
+      
+      return currentCount;
+
+   }
    /**
     * @jmx:managed-attribute
     */
@@ -1324,7 +1366,7 @@
             context.lookup(serverSessionPoolFactoryJNDI);
          
          // the create the pool
-         pool = factory.getServerSessionPool(destination, connection, minSession, maxSession, keepAlive, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, listener);
+         pool = factory.getServerSessionPool(destination, connection, minSession, maxSession, keepAlive, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, lazyInit, recycleIdle, idleTimeout, listener);
       }
       finally
       {
@@ -1459,17 +1501,19 @@
       }
    }
 
-   /**
-    * ExceptionListener for failover handling.
-    */
-   class ExceptionListenerImpl
-      implements ExceptionListener, Runnable
+   /** ExceptionListener for failover handling. */
+   class ExceptionListenerImpl implements ExceptionListener
    {
+      Object lock = new Object();
       JMSContainerInvoker invoker;
       Thread currentThread;
-      boolean notStoped = true;
-      Throwable failure = null;
+      boolean notStopped = true;
 
+      /**
+       * Create a new ExceptionListenerImpl.
+       * 
+       * @param invoker the container invoker
+       */
       ExceptionListenerImpl(final JMSContainerInvoker invoker)
       {
          this.invoker = invoker;
@@ -1477,6 +1521,7 @@
 
       /**
        * Called on jms connection failure events
+       * 
        * @param ex the jms connection failure exception
        */
       public void onException(JMSException ex)
@@ -1491,73 +1536,120 @@
        */
       public void handleFailure(Throwable t)
       {
-         log.warn("JMS provider failure detected: ", t);
-         failure = t;
+         MessageDrivenMetaData metaData = invoker.getMetaData();
+         log.warn("JMS provider failure detected for " + metaData.getEjbName(), t);
          // Run the reconnection in the background
-         MessageDrivenMetaData metaData = invoker.getMetaData();
          String name = "JMSContainerInvoker("+metaData.getEjbName()+") Reconnect";
-         Thread retryThread = new Thread(this, name);
-         retryThread.setDaemon(true);
-         retryThread.start();
-      }
-
-      /**
-       * Try to reconnect to the jms provider until explicitly stopped.
-       */ 
-      public void run()
-      {
-         currentThread = Thread.currentThread();
-         boolean tryIt = true;
-         while (tryIt && notStoped)
+         synchronized (lock)
          {
+            if (currentThread != null)
+            {
+               log.debug("Already a reconnect thread: " + currentThread + " for " + metaData.getEjbName());
+               return;
+            }
+            Runnable runnable = new ExceptionListenerRunnable(t);
+            currentThread = new Thread(runnable, name);
             try
             {
-               invoker.innerStopDelivery();
+               currentThread.setDaemon(true);
+               currentThread.start();
             }
-            catch (Throwable t)
+            catch (RuntimeException rethrow)
             {
-               log.error("Unhandled error stopping connection", t);
+               currentThread = null;
+               throw rethrow;
             }
+            catch (Error rethrow)
+            {
+               currentThread = null;
+               throw rethrow;
+            }
+         }
+      }
 
-            sendNotification(FAILURE_NOTIFICATION, failure);
-            
+      class ExceptionListenerRunnable implements Runnable
+      {
+         Throwable failure;
+
+         /**
+          * Create a new ExceptionListenerRunnable.
+          * 
+          * @param failure the error
+          */
+         public ExceptionListenerRunnable(Throwable failure)
+         {
+            this.failure = failure;
+         }
+         
+         /**
+          * Try to reconnect to the jms provider until explicitly stopped.
+          */ 
+         public void run()
+         {
+            MessageDrivenMetaData metaData = invoker.getMetaData();
             try
             {
-               log.debug("Waiting for reconnect internal " + reconnectInterval + " ms");
-               try
+               boolean tryIt = true;
+               while (tryIt && notStopped)
                {
-                  Thread.sleep(reconnectInterval);
+                  try
+                  {
+                     invoker.innerStopDelivery();
+                  }
+                  catch (Throwable t)
+                  {
+                     log.error("Unhandled error stopping connection for " + metaData.getEjbName(), t);
+                  }
+
+                  sendNotification(FAILURE_NOTIFICATION, failure);
+                  
+                  try
+                  {
+                     log.info("Waiting for reconnect internal " + reconnectInterval + "ms for " + metaData.getEjbName());
+                     try
+                     {
+                        Thread.sleep(reconnectInterval);
+                     }
+                     catch (InterruptedException ie)
+                     {
+                        tryIt = false;
+                        return;
+                     }
+                     
+                     // Reboot container
+                     log.info("Trying to reconnect to JMS provider for " + metaData.getEjbName());
+                     invoker.innerStartDelivery();
+                     tryIt = false;
+
+                     log.info("Reconnected to JMS provider for " + metaData.getEjbName());
+                  }
+                  catch (Throwable t)
+                  {
+                     log.error("Reconnect failed: JMS provider failure detected for " + metaData.getEjbName(), t);
+                  }
                }
-               catch (InterruptedException ie)
+            }
+            finally
+            {
+               synchronized (lock)
                {
-                  tryIt = false;
-                  return;
+                  currentThread = null;
                }
-               
-               // Reboot container
-               log.info("Trying to reconnect to JMS provider");
-               invoker.innerStartDelivery();
-               tryIt = false;
-
-               log.info("Reconnected to JMS provider");
             }
-            catch (Exception e)
-            {
-               log.error("Reconnect failed: JMS provider failure detected:", e);
-            }
          }
-         currentThread = null;
       }
 
       void stop()
       {
-         log.debug("Stop requested");
-
-         notStoped = false;
-         if (currentThread != null)
+         synchronized (lock)
          {
-            currentThread.interrupt();
-            log.debug("Current thread interrupted");
+            log.debug("Stop requested for recovery thread: " + currentThread);
+            notStopped = false;
+            if (currentThread != null)
+            {
+               currentThread.interrupt();
+               log.debug("Recovery thread interrupted: " + currentThread);
+            }
          }
       }
    }

Modified: branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java	2006-08-23 22:48:17 UTC (rev 56207)
+++ branches/JBoss_4_0_3_SP1_JBAS_3511/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java	2006-08-23 22:48:33 UTC (rev 56208)
@@ -8,6 +8,7 @@
 
 /**
  * MBean interface.
+ * @author <a href="mailto:weston.price at jboss.com>Weston Price</a>
  */
 public interface JMSContainerInvokerMBean extends org.jboss.system.ServiceMBean
 {
@@ -38,10 +39,19 @@
 
    void stopDelivery() throws java.lang.Exception;
    
-   boolean isLazyIntialization();
+   boolean getLazyInitialization();
+   
+   void setLazyInitialization(boolean lazy);
       
-   boolean isRecycleIdleSesssions();
+   void setRecycleIdleSessions(boolean recycle);
    
-   long getIdleTimeoutMinutes();
+   boolean getRecycleIdleSessions();
    
+   long getIdleTimeOutMinutes();
+   
+   void setIdleTimeOutMinutes(long idleTimeout);
+   
+   int getSessionTimedOutCount();
+
+   int getCurrentSessionCount();
 }




More information about the jboss-cvs-commits mailing list