[jboss-svn-commits] JBL Code SVN: r38095 - in labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta: src/org/jboss/soa/esb/listeners/message and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun May 20 14:15:02 EDT 2012
Author: kevin.conner at jboss.com
Date: 2012-05-20 14:14:59 -0400 (Sun, 20 May 2012)
New Revision: 38095
Added:
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java
Modified:
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
Log:
Added support for altering the minimum/maximum thread count for a MessageAwareListener: JBESB-3785
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java 2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -110,8 +110,6 @@
}
this.config = config;
-
- lifecycleController = new LifecycleController(new LifecycleControllerAdapter());
}
/**
@@ -128,6 +126,8 @@
if (!ManagedLifecycleState.INITIALISED.equals(getState()))
{
changeState(ManagedLifecycleState.INITIALISING) ;
+
+ lifecycleController = getLifecycleController() ;
try
{
doInitialise() ;
@@ -149,6 +149,15 @@
}
/**
+ * Get a lifecycle controller for this lifecycle
+ * @return The lifecycle controller
+ */
+ protected LifecycleController getLifecycleController()
+ {
+ return new LifecycleController(new LifecycleControllerAdapter()) ;
+ }
+
+ /**
* Handle the initialisation of the managed instance.
*
* @throws ManagedLifecycleException for errors while initialisation.
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java 2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/LifecycleController.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -22,7 +22,9 @@
package org.jboss.soa.esb.listeners.lifecycle;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -235,12 +237,7 @@
* invocation methods - start/stop/initialise/destroy on the Lifecycle.
*/
public MBeanInfo getMBeanInfo() {
- SortedSet<String> names = new TreeSet<String>();
- for (Object name : m_lifecycle.getConfig().getAttributeNames())
- names.add((String) name);
- names.add(LIFECYCLESTATE_ATTRIB);
- names.add(STARTTIME_ATTRIB);
- names.add(XML_ATTRIB);
+ final SortedSet<String> names = getAttributeNames() ;
MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[names.size()];
Iterator<String> it = names.iterator();
for (int i = 0; i < attrs.length; i++) {
@@ -248,53 +245,84 @@
attrs[i] = new MBeanAttributeInfo(
name, "java.lang.String", "Property " + name, true, false, false);
}
- MBeanOperationInfo[] opers = {
- new MBeanOperationInfo(
- START_ACTION, "Start the lifecycle",
- null, "void", MBeanOperationInfo.ACTION),
-
- new MBeanOperationInfo(
- STOP_ACTION, "Stop the lifecycle",
- null, "void", MBeanOperationInfo.ACTION),
- };
+ final List<MBeanOperationInfo> opers = getOperations() ;
return new MBeanInfo(
this.getClass().getName(), "Lifecycle Controller MBean",
- attrs, null, opers, null); // notifications
+ attrs, null, opers.toArray(new MBeanOperationInfo[opers.size()]), null); // notifications
}
+
+ /**
+ * Get the list of attribute names
+ * @return The list of attribute names
+ */
+ protected SortedSet<String> getAttributeNames()
+ {
+ final SortedSet<String> names = new TreeSet<String>();
+ for (Object name : m_lifecycle.getConfig().getAttributeNames())
+ names.add((String) name);
+ names.add(LIFECYCLESTATE_ATTRIB);
+ names.add(STARTTIME_ATTRIB);
+ names.add(XML_ATTRIB);
+ return names ;
+ }
+
+ /**
+ * Get the list of operations
+ * @return The list of operations
+ */
+ protected List<MBeanOperationInfo> getOperations()
+ {
+ final List<MBeanOperationInfo> opers = new ArrayList<MBeanOperationInfo>() ;
+ opers.add(new MBeanOperationInfo(
+ START_ACTION, "Start the lifecycle",
+ null, "void", MBeanOperationInfo.ACTION)) ;
+ opers.add(new MBeanOperationInfo(
+ STOP_ACTION, "Stop the lifecycle",
+ null, "void", MBeanOperationInfo.ACTION)) ;
+ return opers ;
+ }
+
/**
- * Invoke calls the four operations provided by the LifecycleController -
- * initialise, start, stop, destroy. If one of the operation methods fails,
- * we throw the exception, and if an unknown operation is called, we throw
- * an exception.
+ * Invoke calls the operations provided by the LifecycleController
+ * If one of the operation methods fails, we throw the exception,
+ * and if an unknown operation is called, we throw an exception.
*/
public Object invoke(String method, Object[] arg1, String[] arg2) throws ReflectionException {
final ClassLoader current = Thread.currentThread().getContextClassLoader() ;
Thread.currentThread().setContextClassLoader(tccl) ;
try {
- if (method.equalsIgnoreCase(START_ACTION)) {
- try {
- start();
- } catch (ManagedLifecycleException e) {
- logger.error("", e);
- return "Error invoking " + method + ": " + e.toString();
- }
- return "Invoking the " + method + " on the lifecycle.";
- } else if (method.equalsIgnoreCase(STOP_ACTION)) {
- try {
- stop();
- } catch (ManagedLifecycleException e) {
- logger.error("", e);
- return "Error invoking " + method + ": " + e.toString();
- }
- return "Invoking the " + method + " on the lifecycle.";
- } else {
- throw new ReflectionException(new NoSuchMethodException(method));
- }
+ return invokeOperation(method, arg1, arg2) ;
} finally {
Thread.currentThread().setContextClassLoader(current) ;
}
}
+
+ /**
+ * Invoke the operations on the controller
+ */
+ protected Object invokeOperation(final String method, final Object[] arg1, final Object[] arg2)
+ throws ReflectionException {
+ if (START_ACTION.equals(method)) {
+ try {
+ start();
+ } catch (ManagedLifecycleException e) {
+ logger.error("", e);
+ return "Error invoking " + method + ": " + e.toString();
+ }
+ return "Invoking the " + method + " on the lifecycle.";
+ } else if (STOP_ACTION.equals(method)) {
+ try {
+ stop();
+ } catch (ManagedLifecycleException e) {
+ logger.error("", e);
+ return "Error invoking " + method + ": " + e.toString();
+ }
+ return "Invoking the " + method + " on the lifecycle.";
+ } else {
+ throw new ReflectionException(new NoSuchMethodException(method));
+ }
+ }
/**
* This method is here to implement the DynamicMBean interface in full, but it is
Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedLifecycleController.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.lifecycle;
+
+import java.util.List;
+import java.util.SortedSet;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.ReflectionException;
+
+/**
+ * ThreadedLifecycleController is an MBean implementation that
+ * extends the base LifecycleController to enable alteration of the
+ * thread pool.
+ *
+ * @author <a href="Kevin.Conner at jboss.com">Kevin Conner</a>
+ */
+public class ThreadedLifecycleController extends LifecycleController
+{
+ private ThreadedManagedLifecycleAdapter m_threadedLifecycle ;
+
+ public static final String SET_MINIMUM_THREAD_POOL_COUNT_ACTION = "setMinimumThreadPoolCount" ;
+ public static final String SET_MAXIMUM_THREAD_POOL_COUNT_ACTION = "setMaximumThreadPoolCount" ;
+
+ public static final String MINIMUM_THREAD_POOL_COUNT_ATTRIB = "minimumThreadPoolCount" ;
+ public static final String MAXIMUM_THREAD_POOL_COUNT_ATTRIB = "maximumThreadPoolCount" ;
+
+ /**
+ * Constructor using lifecycle and config tree.
+ * @param f_lifecycle lifecycle
+ * @param f_configtree config tree
+ */
+ public ThreadedLifecycleController(final ThreadedManagedLifecycleAdapter f_threadedLifecycle)
+ {
+ super(f_threadedLifecycle) ;
+ m_threadedLifecycle = f_threadedLifecycle ;
+ }
+
+ /**
+ * Lifecycle mutator.
+ * @param f_mla lifecycle
+ */
+ public void setLifecycle(final ManagedLifecycleAdapter f_mla)
+ {
+ if (f_mla instanceof ThreadedManagedLifecycleAdapter)
+ {
+ m_threadedLifecycle = (ThreadedManagedLifecycleAdapter)f_mla ;
+ super.setLifecycle(f_mla) ;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid managed lifecycle adapter : " + f_mla) ;
+ }
+ }
+
+ /**
+ * Get the list of attributes.
+ */
+ public AttributeList getAttributes(final String[] arg0)
+ {
+ final AttributeList attributeList = super.getAttributes(arg0) ;
+
+ final Attribute minimumThreadPoolCount = new Attribute(MINIMUM_THREAD_POOL_COUNT_ATTRIB, m_threadedLifecycle.getMinimumThreadPoolCount()) ;
+ attributeList.add(minimumThreadPoolCount) ;
+
+ final Attribute maximumThreadPoolCount = new Attribute(MAXIMUM_THREAD_POOL_COUNT_ATTRIB, m_threadedLifecycle.getMaximumThreadPoolCount()) ;
+ attributeList.add(maximumThreadPoolCount) ;
+
+ return attributeList ;
+ }
+
+ /**
+ * Gets the attribute value.
+ */
+ public synchronized String getAttribute(final String name)
+ throws AttributeNotFoundException
+ {
+ final String value ;
+ if (MINIMUM_THREAD_POOL_COUNT_ATTRIB.equals(name))
+ {
+ value = Integer.toString(m_threadedLifecycle.getMinimumThreadPoolCount()) ;
+ }
+ else if (MAXIMUM_THREAD_POOL_COUNT_ATTRIB.equals(name))
+ {
+ value = Integer.toString(m_threadedLifecycle.getMaximumThreadPoolCount()) ;
+ }
+ else
+ {
+ value = super.getAttribute(name) ;
+ }
+ return value ;
+ }
+ /**
+ * Get the list of attribute names
+ * @return The list of attribute names
+ */
+ protected SortedSet<String> getAttributeNames()
+ {
+ final SortedSet<String> names = super.getAttributeNames() ;
+ names.add(MINIMUM_THREAD_POOL_COUNT_ATTRIB) ;
+ names.add(MAXIMUM_THREAD_POOL_COUNT_ATTRIB) ;
+ return names ;
+ }
+
+ /**
+ * Get the list of operations
+ * @return The list of operations
+ */
+ protected List<MBeanOperationInfo> getOperations()
+ {
+ final List<MBeanOperationInfo> opers = super.getOperations() ;
+ opers.add(new MBeanOperationInfo(
+ SET_MINIMUM_THREAD_POOL_COUNT_ACTION,
+ "Set the minimum thread pool size",
+ new MBeanParameterInfo[] { new MBeanParameterInfo(
+ MINIMUM_THREAD_POOL_COUNT_ATTRIB, "int", "Minimum thread pool count") },
+ "void", MBeanOperationInfo.ACTION)) ;
+ opers.add(new MBeanOperationInfo(
+ SET_MAXIMUM_THREAD_POOL_COUNT_ACTION,
+ "Set the maximum thread pool size",
+ new MBeanParameterInfo[] { new MBeanParameterInfo(
+ MAXIMUM_THREAD_POOL_COUNT_ATTRIB, "int", "Maximum thread pool count") },
+ "void", MBeanOperationInfo.ACTION)) ;
+ return opers ;
+ }
+
+ /**
+ * Invoke the operations on the controller
+ */
+ protected Object invokeOperation(final String method, final Object[] arg1, final Object[] arg2)
+ throws ReflectionException
+ {
+ if (SET_MINIMUM_THREAD_POOL_COUNT_ACTION.equals(method))
+ {
+ if ((arg1 == null) || (arg1.length != 1) || !(arg1[0] instanceof Integer))
+ {
+ return "Invalid parameter to " + SET_MINIMUM_THREAD_POOL_COUNT_ACTION + " action" ;
+ }
+ final Integer intVal = (Integer) arg1[0] ;
+ m_threadedLifecycle.setMinimumThreadPoolCount(intVal.intValue()) ;
+ return "Invoking the " + method + " on the lifecycle." ;
+ }
+ else if (SET_MAXIMUM_THREAD_POOL_COUNT_ACTION.equals(method))
+ {
+ if ((arg1 == null) || (arg1.length != 1) || !(arg1[0] instanceof Integer))
+ {
+ return "Invalid parameter to " + SET_MAXIMUM_THREAD_POOL_COUNT_ACTION + " action" ;
+ }
+ final Integer intVal = (Integer) arg1[0] ;
+ m_threadedLifecycle.setMaximumThreadPoolCount(intVal.intValue()) ;
+ return "Invoking the " + method + " on the lifecycle." ;
+ }
+ else
+ {
+ return super.invokeOperation(method, arg1, arg2) ;
+ }
+ }
+}
Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/ThreadedManagedLifecycleAdapter.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.lifecycle;
+
+
+/**
+ * Adapter interface used for management through lifecycle MBean
+ */
+public interface ThreadedManagedLifecycleAdapter extends ManagedLifecycleAdapter
+{
+ /**
+ * Get the minimum thread pool count.
+ * @return the minimum thread pool count.
+ */
+ public int getMinimumThreadPoolCount() ;
+ /**
+ * Get the maximum thread pool count.
+ * @return the maximum thread pool count.
+ */
+ public int getMaximumThreadPoolCount() ;
+ /**
+ * Get the minimum thread pool count.
+ * @param minimumThreadCount the minimum thread pool count.
+ */
+ public void setMinimumThreadPoolCount(final int minimumThreadCount) ;
+ /**
+ * Get the maximum thread pool count.
+ * @param maximumThreadCount the maximum thread pool count.
+ */
+ public void setMaximumThreadPoolCount(final int maximumThreadCount) ;
+}
\ No newline at end of file
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -23,9 +23,9 @@
package org.jboss.soa.esb.listeners.message;
import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -45,8 +45,12 @@
import org.jboss.soa.esb.listeners.ListenerUtil;
import org.jboss.soa.esb.listeners.RegistryUtil;
import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.LifecycleController;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleState;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleThreadState;
+import org.jboss.soa.esb.listeners.lifecycle.ThreadedLifecycleController;
+import org.jboss.soa.esb.listeners.lifecycle.ThreadedManagedLifecycleAdapter;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
@@ -173,6 +177,15 @@
}
/**
+ * Get a lifecycle controller for this lifecycle
+ * @return The lifecycle controller
+ */
+ protected LifecycleController getLifecycleController()
+ {
+ return new ThreadedLifecycleController(new ThreadedLifecycleControllerAdapter()) ;
+ }
+
+ /**
* Handle the initialisation of the managed instance.
*
* @throws ManagedLifecycleException for errors while initialisation.
@@ -230,7 +243,7 @@
{
checkExecutorTermination() ;
- _execService = Executors.newFixedThreadPool(_maxThreads) ;
+ _execService = new ThreadPoolExecutor(_minThreads, _maxThreads, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>()) ;
super.doStart() ;
}
@@ -465,6 +478,30 @@
}
}
+ public void setMinimumThreadCount(final int minimumThreadCount)
+ {
+ synchronized (_synchThreads)
+ {
+ if (_execService != null)
+ {
+ _execService.setCorePoolSize(minimumThreadCount) ;
+ }
+ _minThreads = minimumThreadCount ;
+ }
+ }
+
+ public void setMaximumThreadCount(final int maximumThreadCount)
+ {
+ synchronized (_synchThreads)
+ {
+ if (_execService != null)
+ {
+ _execService.setMaximumPoolSize(maximumThreadCount) ;
+ }
+ _maxThreads = maximumThreadCount ;
+ }
+ }
+
private void rollbackTransaction ()
{
try
@@ -589,6 +626,55 @@
private Object _txHandle;
}
+ private final class ThreadedLifecycleControllerAdapter implements ThreadedManagedLifecycleAdapter
+ {
+ public void start()
+ throws ManagedLifecycleException
+ {
+ MessageAwareListener.this.start() ;
+ }
+
+ public void stop()
+ throws ManagedLifecycleException
+ {
+ MessageAwareListener.this.stop() ;
+ }
+
+ public ManagedLifecycleState getState()
+ {
+ return MessageAwareListener.this.getState() ;
+ }
+
+ public ConfigTree getConfig()
+ {
+ return MessageAwareListener.this.getConfig() ;
+ }
+
+ @Override
+ public int getMinimumThreadPoolCount()
+ {
+ return _minThreads ;
+ }
+
+ @Override
+ public int getMaximumThreadPoolCount()
+ {
+ return _maxThreads ;
+ }
+
+ @Override
+ public void setMinimumThreadPoolCount(int minimumThreadCount)
+ {
+ setMinimumThreadCount(minimumThreadCount) ;
+ }
+
+ @Override
+ public void setMaximumThreadPoolCount(int maximumThreadCount)
+ {
+ setMaximumThreadCount(maximumThreadCount) ;
+ }
+ }
+
private ConfigTree _config;
private String _eprCategoryName;
@@ -597,7 +683,8 @@
private EPR _epr;
- private int _maxThreads;
+ private volatile int _minThreads = 1 ;
+ private volatile int _maxThreads;
private int _defaultMaxThreads = 1;
@@ -605,7 +692,7 @@
private long _pauseLapseInMillis = 50 ;
- private ExecutorService _execService;
+ private ThreadPoolExecutor _execService;
private byte[] _synchThreads = new byte[0];
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java 2012-05-18 17:13:22 UTC (rev 38094)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java 2012-05-20 18:14:59 UTC (rev 38095)
@@ -18,8 +18,15 @@
package org.jboss.soa.esb.listeners.message;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
+
+import java.util.Stack;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import junit.framework.JUnit4TestAdapter;
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
@@ -94,6 +101,80 @@
assertEquals("Message and execution threads", courier.getThread(), MessageAwareListenerTestAction.getThread()) ;
}
+ @Test
+ public void testMaximumThreadPoolCount()
+ throws Exception
+ {
+ final ConfigTree config = new ConfigTree("MessageAwareListener") ;
+ final String category = "TestCategory" ;
+ final String service = "TestName" ;
+
+ config.setAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, category) ;
+ config.setAttribute(ListenerTagNames.SERVICE_NAME_TAG, service) ;
+ config.setAttribute(ListenerTagNames.MAX_THREADS_TAG, "1") ;
+
+ final ConfigTree configEPR = new ConfigTree(ListenerTagNames.EPR_TAG, config) ;
+ configEPR.setAttribute(ListenerTagNames.URL_TAG, "invm://test-epr") ;
+
+ final ConfigTree actionTree = new ConfigTree(ListenerTagNames.ACTION_ELEMENT_TAG, config) ;
+ actionTree.setAttribute(ListenerTagNames.ACTION_ELEMENT_TAG, "TestAction");
+ actionTree.setAttribute(ListenerTagNames.ACTION_CLASS_TAG, MessageAwareListenerTestThreadCountAction.class.getName());
+
+ final MessageAwareListenerMockThreadCountCourier courier = new MessageAwareListenerMockThreadCountCourier() ;
+ final MessageAwareListenerTestListener listener = new MessageAwareListenerTestListener(config, courier) ;
+
+ listener.initialise() ;
+
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+
+ listener.start() ;
+
+ sleep(5000) ;
+
+ assertEquals("barrier waiting", 1, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+ assertEquals("message count", 1, courier.getMessageCount()) ;
+
+ courier.clearMessages() ;
+ MessageAwareListenerTestThreadCountAction.BARRIER.reset() ;
+
+ listener.setMaximumThreadCount(2) ;
+
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+
+ sleep(5000) ;
+
+ assertEquals("message count", 0, courier.getMessageCount()) ;
+ assertEquals("barrier waiting", 0, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+
+ listener.setMaximumThreadCount(1) ;
+
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+ courier.add(MessageFactory.getInstance().getMessage()) ;
+
+ sleep(5000) ;
+
+ assertEquals("message count", 1, courier.getMessageCount()) ;
+ assertEquals("barrier waiting", 1, MessageAwareListenerTestThreadCountAction.BARRIER.getNumberWaiting()) ;
+
+ courier.clearMessages() ;
+ MessageAwareListenerTestThreadCountAction.BARRIER.reset() ;
+
+ listener.stop() ;
+
+ listener.destroy() ;
+ }
+
+ private void sleep(final long millis)
+ {
+ try
+ {
+ Thread.sleep(millis) ;
+ }
+ catch (final InterruptedException ie) {}
+ }
+
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(MessageAwareListenerUnitTest.class) ;
@@ -221,4 +302,64 @@
return thread ;
}
}
+
+ public static final class MessageAwareListenerMockThreadCountCourier implements PickUpOnlyCourier
+ {
+ private final Stack<Message> messages = new Stack<Message>() ;
+
+ public Message pickup(final long millis)
+ throws CourierException, CourierTimeoutException
+ {
+ if (messages.empty())
+ {
+ return null ;
+ }
+ else
+ {
+ return messages.pop() ;
+ }
+ }
+
+ public void add(final Message message)
+ {
+ messages.push(message) ;
+ }
+
+ public int getMessageCount()
+ {
+ return messages.size() ;
+ }
+
+ public void clearMessages()
+ {
+ messages.clear() ;
+ }
+
+ public void cleanup()
+ {
+ }
+ }
+
+ public static final class MessageAwareListenerTestThreadCountAction extends AbstractActionPipelineProcessor
+ {
+ public static CyclicBarrier BARRIER = new CyclicBarrier(2) ;
+
+ public MessageAwareListenerTestThreadCountAction(final ConfigTree config)
+ {
+ }
+
+ public Message process(final Message message)
+ throws ActionProcessingException
+ {
+ try
+ {
+ BARRIER.await(20, TimeUnit.SECONDS) ;
+ }
+ catch (final TimeoutException te) {} // ignore
+ catch (final InterruptedException ie) {} // ignore
+ catch (final BrokenBarrierException bbe) {} // ignore
+
+ return message ;
+ }
+ }
}
More information about the jboss-svn-commits
mailing list