[jboss-svn-commits] JBL Code SVN: r38095 - in labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta: src/org/jboss/soa/esb/listeners/message and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun May 20 14:15:02 EDT 2012


Author: kevin.conner at jboss.com
Date: 2012-05-20 14:14:59 -0400 (Sun, 20 May 2012)
New Revision: 38095

Added:
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java
Modified:
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
   labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
Log:
Added support for altering the minimum/maximum thread count for a MessageAwareListener: JBESB-3785

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java	2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -110,8 +110,6 @@
         }
 
         this.config = config;
-       
-        lifecycleController = new LifecycleController(new LifecycleControllerAdapter());
     }
     
     /**
@@ -128,6 +126,8 @@
         if (!ManagedLifecycleState.INITIALISED.equals(getState()))
         {
             changeState(ManagedLifecycleState.INITIALISING) ;
+            
+            lifecycleController = getLifecycleController() ;
             try
             {
                 doInitialise() ;
@@ -149,6 +149,15 @@
     }
     
     /**
+     * Get a lifecycle controller for this lifecycle
+     * @return The lifecycle controller
+     */
+    protected LifecycleController getLifecycleController()
+    {
+        return new LifecycleController(new LifecycleControllerAdapter()) ;        
+    }
+    
+    /**
      * Handle the initialisation of the managed instance.
      * 
      * @throws ManagedLifecycleException for errors while initialisation.

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java	2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -22,7 +22,9 @@
 package org.jboss.soa.esb.listeners.lifecycle;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -235,12 +237,7 @@
 	 *  invocation methods - start/stop/initialise/destroy on the Lifecycle. 
 	 */
     public MBeanInfo getMBeanInfo() {
-		SortedSet<String> names = new TreeSet<String>();
-        for (Object name : m_lifecycle.getConfig().getAttributeNames())
-            names.add((String) name);
-        names.add(LIFECYCLESTATE_ATTRIB);
-        names.add(STARTTIME_ATTRIB);
-        names.add(XML_ATTRIB);
+        final SortedSet<String> names = getAttributeNames() ;
         MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[names.size()];
         Iterator<String> it = names.iterator();
         for (int i = 0; i < attrs.length; i++) {
@@ -248,53 +245,84 @@
             attrs[i] = new MBeanAttributeInfo(
                     name, "java.lang.String", "Property " + name, true, false, false);
         }
-        MBeanOperationInfo[] opers = {
-            new MBeanOperationInfo(
-            		START_ACTION, "Start the lifecycle",
-                    null, "void", MBeanOperationInfo.ACTION),
-
-            new MBeanOperationInfo(
-            		STOP_ACTION, "Stop the lifecycle",
-                    null, "void", MBeanOperationInfo.ACTION),
-        };
+        final List<MBeanOperationInfo> opers = getOperations() ;
         return new MBeanInfo(
                 this.getClass().getName(), "Lifecycle Controller MBean",
-                attrs, null, opers, null); // notifications
+                attrs, null, opers.toArray(new MBeanOperationInfo[opers.size()]), null); // notifications
 	}
+    
+    /**
+     * Get the list of attribute names
+     * @return The list of attribute names
+     */
+    protected SortedSet<String> getAttributeNames()
+    {
+        final SortedSet<String> names = new TreeSet<String>();
+        for (Object name : m_lifecycle.getConfig().getAttributeNames())
+            names.add((String) name);
+        names.add(LIFECYCLESTATE_ATTRIB);
+        names.add(STARTTIME_ATTRIB);
+        names.add(XML_ATTRIB);
+        return names ;
+    }
+    
+    /**
+     * Get the list of operations
+     * @return The list of operations
+     */
+    protected List<MBeanOperationInfo> getOperations()
+    {
+        final List<MBeanOperationInfo> opers = new ArrayList<MBeanOperationInfo>() ;
+        opers.add(new MBeanOperationInfo(
+            START_ACTION, "Start the lifecycle",
+            null, "void", MBeanOperationInfo.ACTION)) ;
 
+        opers.add(new MBeanOperationInfo(
+                STOP_ACTION, "Stop the lifecycle",
+                null, "void", MBeanOperationInfo.ACTION)) ;
+        return opers ;
+    }
+
     /**
-     * Invoke calls the four operations provided by the LifecycleController - 
-     * initialise, start, stop, destroy.    If one of the operation methods fails,
-     * we throw the exception, and if an unknown operation is called, we throw
-     * an exception.
+     * Invoke calls the operations provided by the LifecycleController
+     * If one of the operation methods fails, we throw the exception,
+     * and if an unknown operation is called, we throw an exception.
      */
 	public Object invoke(String method, Object[] arg1, String[] arg2) throws ReflectionException {
 		final ClassLoader current = Thread.currentThread().getContextClassLoader() ;
 		Thread.currentThread().setContextClassLoader(tccl) ;
 		try {
-			if (method.equalsIgnoreCase(START_ACTION)) {
-				try {
-					start();
-				} catch (ManagedLifecycleException e) {
-					logger.error("", e);
-					return "Error invoking " + method + ": " + e.toString();
-				}
-				return "Invoking the " + method + " on the lifecycle.";
-			} else if (method.equalsIgnoreCase(STOP_ACTION)) {
-				try {
-					stop();
-				} catch (ManagedLifecycleException e) {
-					logger.error("", e);
-					return "Error invoking " + method + ": " + e.toString();
-				}
-				return "Invoking the " + method + " on the lifecycle.";
-			} else {
-				throw new ReflectionException(new NoSuchMethodException(method));
-			}
+		    return invokeOperation(method, arg1, arg2) ;
 		} finally {
 			Thread.currentThread().setContextClassLoader(current) ;
 		}
 	}
+	
+	/**
+	 * Invoke the operations on the controller
+	 */
+	protected Object invokeOperation(final String method, final Object[] arg1, final Object[] arg2)
+	    throws ReflectionException {
+        if (START_ACTION.equals(method)) {
+            try {
+                start();
+            } catch (ManagedLifecycleException e) {
+                logger.error("", e);
+                return "Error invoking " + method + ": " + e.toString();
+            }
+            return "Invoking the " + method + " on the lifecycle.";
+        } else if (STOP_ACTION.equals(method)) {
+            try {
+                stop();
+            } catch (ManagedLifecycleException e) {
+                logger.error("", e);
+                return "Error invoking " + method + ": " + e.toString();
+            }
+            return "Invoking the " + method + " on the lifecycle.";
+        } else {
+            throw new ReflectionException(new NoSuchMethodException(method));
+        }
+	}
 
 	/**
 	 * This method is here to implement the DynamicMBean interface in full, but it is

Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.lifecycle;
+
+import java.util.List;
+import java.util.SortedSet;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.ReflectionException;
+
+/**
+ * ThreadedLifecycleController is an MBean implementation that
+ * extends the base LifecycleController to enable alteration of the
+ * thread pool. 
+ *
+ * @author <a href="Kevin.Conner at jboss.com">Kevin Conner</a>
+ */
+public class ThreadedLifecycleController extends LifecycleController
+{
+    private ThreadedManagedLifecycleAdapter m_threadedLifecycle ;
+    
+    public static final String SET_MINIMUM_THREAD_POOL_COUNT_ACTION = "setMinimumThreadPoolCount" ;
+    public static final String SET_MAXIMUM_THREAD_POOL_COUNT_ACTION = "setMaximumThreadPoolCount" ;
+    
+    public static final String MINIMUM_THREAD_POOL_COUNT_ATTRIB = "minimumThreadPoolCount" ;
+    public static final String MAXIMUM_THREAD_POOL_COUNT_ATTRIB = "maximumThreadPoolCount" ;
+        
+    /**
+     * Constructor using lifecycle and config tree.
+     * @param f_lifecycle lifecycle
+     * @param f_configtree config tree
+     */
+    public ThreadedLifecycleController(final ThreadedManagedLifecycleAdapter f_threadedLifecycle)
+    {
+        super(f_threadedLifecycle) ;
+        m_threadedLifecycle = f_threadedLifecycle ;
+    }
+    
+    /**
+     * Lifecycle mutator.
+     * @param f_mla lifecycle
+     */
+    public void setLifecycle(final ManagedLifecycleAdapter f_mla)
+    {
+        if (f_mla instanceof ThreadedManagedLifecycleAdapter)
+        {
+            m_threadedLifecycle = (ThreadedManagedLifecycleAdapter)f_mla ;
+            super.setLifecycle(f_mla) ;
+        }
+        else
+        {
+            throw new IllegalArgumentException("Invalid managed lifecycle adapter : " + f_mla) ;
+        }
+    }
+
+    /**
+     * Get the list of attributes.
+     */
+    public AttributeList getAttributes(final String[] arg0)
+    {
+        final AttributeList attributeList = super.getAttributes(arg0) ;
+        
+        final Attribute minimumThreadPoolCount = new Attribute(MINIMUM_THREAD_POOL_COUNT_ATTRIB, m_threadedLifecycle.getMinimumThreadPoolCount()) ;
+        attributeList.add(minimumThreadPoolCount) ;
+
+        final Attribute maximumThreadPoolCount = new Attribute(MAXIMUM_THREAD_POOL_COUNT_ATTRIB, m_threadedLifecycle.getMaximumThreadPoolCount()) ;
+        attributeList.add(maximumThreadPoolCount) ;
+
+        return attributeList ;
+    }
+
+    /**
+     * Gets the attribute value.   
+     */
+    public synchronized String getAttribute(final String name)
+        throws AttributeNotFoundException
+    {
+        final String value ;
+        if (MINIMUM_THREAD_POOL_COUNT_ATTRIB.equals(name))
+        {
+            value = Integer.toString(m_threadedLifecycle.getMinimumThreadPoolCount()) ;
+        }
+        else if (MAXIMUM_THREAD_POOL_COUNT_ATTRIB.equals(name))
+        {
+            value = Integer.toString(m_threadedLifecycle.getMaximumThreadPoolCount()) ;
+        }
+        else
+        {
+            value = super.getAttribute(name) ;
+        }
+        return value ;
+    }
+    /**
+     * Get the list of attribute names
+     * @return The list of attribute names
+     */
+    protected SortedSet<String> getAttributeNames()
+    {
+        final SortedSet<String> names = super.getAttributeNames() ;
+        names.add(MINIMUM_THREAD_POOL_COUNT_ATTRIB) ;
+        names.add(MAXIMUM_THREAD_POOL_COUNT_ATTRIB) ;
+        return names ;
+    }
+    
+    /**
+     * Get the list of operations
+     * @return The list of operations
+     */
+    protected List<MBeanOperationInfo> getOperations()
+    {
+        final List<MBeanOperationInfo> opers = super.getOperations() ;
+        opers.add(new MBeanOperationInfo(
+            SET_MINIMUM_THREAD_POOL_COUNT_ACTION,
+            "Set the minimum thread pool size",
+            new MBeanParameterInfo[] { new MBeanParameterInfo(
+                MINIMUM_THREAD_POOL_COUNT_ATTRIB, "int", "Minimum thread pool count") },
+            "void", MBeanOperationInfo.ACTION)) ;
+        opers.add(new MBeanOperationInfo(
+            SET_MAXIMUM_THREAD_POOL_COUNT_ACTION,
+            "Set the maximum thread pool size",
+            new MBeanParameterInfo[] { new MBeanParameterInfo(
+                MAXIMUM_THREAD_POOL_COUNT_ATTRIB, "int", "Maximum thread pool count") },
+            "void", MBeanOperationInfo.ACTION)) ;
+        return opers ;
+    }
+
+    /**
+     * Invoke the operations on the controller
+     */
+    protected Object invokeOperation(final String method, final Object[] arg1, final Object[] arg2)
+        throws ReflectionException
+    {
+        if (SET_MINIMUM_THREAD_POOL_COUNT_ACTION.equals(method))
+        {
+            if ((arg1 == null) || (arg1.length != 1) || !(arg1[0] instanceof Integer))
+            {
+                return "Invalid parameter to " + SET_MINIMUM_THREAD_POOL_COUNT_ACTION + " action" ;
+            }
+            final Integer intVal = (Integer) arg1[0] ;
+            m_threadedLifecycle.setMinimumThreadPoolCount(intVal.intValue()) ;
+            return "Invoking the " + method + " on the lifecycle." ;
+        }
+        else if (SET_MAXIMUM_THREAD_POOL_COUNT_ACTION.equals(method))
+        {
+            if ((arg1 == null) || (arg1.length != 1) || !(arg1[0] instanceof Integer))
+            {
+                return "Invalid parameter to " + SET_MAXIMUM_THREAD_POOL_COUNT_ACTION + " action" ;
+            }
+            final Integer intVal = (Integer) arg1[0] ;
+            m_threadedLifecycle.setMaximumThreadPoolCount(intVal.intValue()) ;
+            return "Invoking the " + method + " on the lifecycle." ;
+        }
+        else
+        {
+            return super.invokeOperation(method, arg1, arg2) ;
+        }
+    }
+}

Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.lifecycle;
+
+
+/**
+ * Adapter interface used for management through lifecycle MBean
+ */
+public interface ThreadedManagedLifecycleAdapter extends ManagedLifecycleAdapter
+{
+    /**
+     * Get the minimum thread pool count.
+     * @return the minimum thread pool count.
+     */
+    public int getMinimumThreadPoolCount() ;
+    /**
+     * Get the maximum thread pool count.
+     * @return the maximum thread pool count.
+     */
+    public int getMaximumThreadPoolCount() ;
+    /**
+     * Get the minimum thread pool count.
+     * @param minimumThreadCount the minimum thread pool count.
+     */
+    public void setMinimumThreadPoolCount(final int minimumThreadCount) ;
+    /**
+     * Get the maximum thread pool count.
+     * @param maximumThreadCount the maximum thread pool count.
+     */
+    public void setMaximumThreadPoolCount(final int maximumThreadCount) ;
+}
\ No newline at end of file

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -23,9 +23,9 @@
 package org.jboss.soa.esb.listeners.message;
 
 import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
@@ -45,8 +45,12 @@
 import org.jboss.soa.esb.listeners.ListenerUtil;
 import org.jboss.soa.esb.listeners.RegistryUtil;
 import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.LifecycleController;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleState;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleThreadState;
+import org.jboss.soa.esb.listeners.lifecycle.ThreadedLifecycleController;
+import org.jboss.soa.esb.listeners.lifecycle.ThreadedManagedLifecycleAdapter;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
@@ -173,6 +177,15 @@
 	}
 
         /**
+         * Get a lifecycle controller for this lifecycle
+         * @return The lifecycle controller
+         */
+        protected LifecycleController getLifecycleController()
+        {
+            return new ThreadedLifecycleController(new ThreadedLifecycleControllerAdapter()) ;        
+        }
+        
+        /**
          * Handle the initialisation of the managed instance.
          *
          * @throws ManagedLifecycleException for errors while initialisation.
@@ -230,7 +243,7 @@
         {
             checkExecutorTermination() ;
 
-            _execService = Executors.newFixedThreadPool(_maxThreads) ;
+            _execService = new ThreadPoolExecutor(_minThreads, _maxThreads, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>()) ;
 
             super.doStart() ;
         }
@@ -465,6 +478,30 @@
             }
         }
         
+        public void setMinimumThreadCount(final int minimumThreadCount)
+        {
+            synchronized (_synchThreads)
+            {
+                if (_execService != null)
+                {
+                    _execService.setCorePoolSize(minimumThreadCount) ;
+                }
+                _minThreads = minimumThreadCount ;
+            }
+        }
+        
+        public void setMaximumThreadCount(final int maximumThreadCount)
+        {
+            synchronized (_synchThreads)
+            {
+                if (_execService != null)
+                {
+                    _execService.setMaximumPoolSize(maximumThreadCount) ;
+                }
+                _maxThreads = maximumThreadCount ;
+            }
+        }
+        
         private void rollbackTransaction ()
         {
         	try
@@ -589,6 +626,55 @@
         	private Object _txHandle;
         }
         
+        private final class ThreadedLifecycleControllerAdapter implements ThreadedManagedLifecycleAdapter
+        {
+            public void start()
+                throws ManagedLifecycleException
+            {
+                MessageAwareListener.this.start() ;
+            }
+
+            public void stop()
+                throws ManagedLifecycleException
+            {
+                MessageAwareListener.this.stop() ;
+            }
+
+            public ManagedLifecycleState getState()
+            {
+                return MessageAwareListener.this.getState() ;
+            }
+
+            public ConfigTree getConfig()
+            {
+                return MessageAwareListener.this.getConfig() ;
+            }
+
+            @Override
+            public int getMinimumThreadPoolCount()
+            {
+                return _minThreads ;
+            }
+
+            @Override
+            public int getMaximumThreadPoolCount()
+            {
+                return _maxThreads ;
+            }
+
+            @Override
+            public void setMinimumThreadPoolCount(int minimumThreadCount)
+            {
+                setMinimumThreadCount(minimumThreadCount) ;
+            }
+
+            @Override
+            public void setMaximumThreadPoolCount(int maximumThreadCount)
+            {
+                setMaximumThreadCount(maximumThreadCount) ;
+            }
+        }
+        
         private ConfigTree _config;
 
         private String _eprCategoryName;
@@ -597,7 +683,8 @@
 
         private EPR _epr;
 
-        private int _maxThreads;
+        private volatile int _minThreads = 1 ;
+        private volatile int _maxThreads;
 
         private int _defaultMaxThreads = 1;
 
@@ -605,7 +692,7 @@
 
         private long _pauseLapseInMillis = 50 ;
 
-        private ExecutorService _execService;
+        private ThreadPoolExecutor _execService;
 
         private byte[] _synchThreads = new byte[0];
 

Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java	2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java	2012-05-20 18:14:59 UTC (rev 38095)
@@ -18,8 +18,15 @@
 package org.jboss.soa.esb.listeners.message;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
+
+import java.util.Stack;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import junit.framework.JUnit4TestAdapter;
 
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
@@ -94,6 +101,80 @@
         assertEquals("Message and execution threads", courier.getThread(), MessageAwareListenerTestAction.getThread()) ;
     }
     
+    @Test
+    public void testMaximumThreadPoolCount()
+        throws Exception
+    {
+        final ConfigTree config = new ConfigTree("MessageAwareListener") ;
+        final String category = "TestCategory" ;
+        final String service = "TestName" ;
+        
+        config.setAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, category) ;
+        config.setAttribute(ListenerTagNames.SERVICE_NAME_TAG, service) ;
+        config.setAttribute(ListenerTagNames.MAX_THREADS_TAG, "1") ;
+        
+        final ConfigTree configEPR = new ConfigTree(ListenerTagNames.EPR_TAG, config) ;
+        configEPR.setAttribute(ListenerTagNames.URL_TAG, "invm://test-epr") ;
+        
+        final ConfigTree actionTree = new ConfigTree(ListenerTagNames.ACTION_ELEMENT_TAG, config) ;
+        actionTree.setAttribute(ListenerTagNames.ACTION_ELEMENT_TAG, "TestAction");
+        actionTree.setAttribute(ListenerTagNames.ACTION_CLASS_TAG, MessageAwareListenerTestThreadCountAction.class.getName());
+
+        final MessageAwareListenerMockThreadCountCourier courier = new MessageAwareListenerMockThreadCountCourier() ;
+        final MessageAwareListenerTestListener listener = new MessageAwareListenerTestListener(config, courier) ;
+        
+        listener.initialise() ;
+        
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        
+        listener.start() ;
+        
+        sleep(5000) ;
+        
+        assertEquals("barrier waiting", 1, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+        assertEquals("message count", 1, courier.getMessageCount()) ;
+        
+        courier.clearMessages() ;
+        MessageAwareListenerTestThreadCountAction.BARRIER.reset() ;
+        
+        listener.setMaximumThreadCount(2) ;
+
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        
+        sleep(5000) ;
+        
+        assertEquals("message count", 0, courier.getMessageCount()) ;
+        assertEquals("barrier waiting", 0, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+
+        listener.setMaximumThreadCount(1) ;
+
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        courier.add(MessageFactory.getInstance().getMessage()) ;
+        
+        sleep(5000) ;
+        
+        assertEquals("message count", 1, courier.getMessageCount()) ;
+        assertEquals("barrier waiting", 1, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+        
+        courier.clearMessages() ;
+        MessageAwareListenerTestThreadCountAction.BARRIER.reset() ;
+        
+        listener.stop() ;
+        
+        listener.destroy() ;
+    }
+    
+    private void sleep(final long millis)
+    {
+        try
+        {
+            Thread.sleep(millis) ;
+        }
+        catch (final InterruptedException ie) {}
+    }
+    
     public static junit.framework.Test suite()
     {
         return new JUnit4TestAdapter(MessageAwareListenerUnitTest.class) ;
@@ -221,4 +302,64 @@
             return thread ;
         }
     }
+
+    public static final class MessageAwareListenerMockThreadCountCourier implements PickUpOnlyCourier
+    {
+        private final Stack<Message> messages = new Stack<Message>() ;
+        
+        public Message pickup(final long millis)
+            throws CourierException, CourierTimeoutException
+        {
+            if (messages.empty())
+            {
+                return null ;
+            }
+            else
+            {
+                return messages.pop() ;
+            }
+        }
+        
+        public void add(final Message message)
+        {
+            messages.push(message) ;
+        }
+        
+        public int getMessageCount()
+        {
+            return messages.size() ;
+        }
+        
+        public void clearMessages()
+        {
+            messages.clear() ;
+        }
+
+        public void cleanup()
+        {
+        }
+    }
+    
+    public static final class MessageAwareListenerTestThreadCountAction extends AbstractActionPipelineProcessor
+    {
+        public static CyclicBarrier BARRIER = new CyclicBarrier(2) ;
+        
+        public MessageAwareListenerTestThreadCountAction(final ConfigTree config)
+        {
+        }
+        
+        public Message process(final Message message)
+            throws ActionProcessingException
+        {
+            try
+            {
+                    BARRIER.await(20, TimeUnit.SECONDS) ;
+            }
+            catch (final TimeoutException te) {} // ignore
+            catch (final InterruptedException ie) {} // ignore
+            catch (final BrokenBarrierException bbe) {} // ignore
+            
+            return message ;
+        }
+    }
 }



More information about the jboss-svn-commits mailing list