[jboss-cvs] JBoss Messaging SVN: r4493 - in branches/Branch_Experimental_JBMESSAGING_1356_2: src/etc/xmdesc and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 16 21:35:39 EDT 2008
Author: ovidiu.feodorov at jboss.com
Date: 2008-06-16 21:35:38 -0400 (Mon, 16 Jun 2008)
New Revision: 4493
Added:
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java
Modified:
branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml
branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
Log:
Modified ServerPeer to accept a MaximumThreadPoolSize JMX attribute. If not specified, or 0, the behavior is consistent with the behavior of the previous versions: a QueuedExecutor per session endpoint
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml 2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml 2008-06-17 01:35:38 UTC (rev 4493)
@@ -99,6 +99,14 @@
<attribute name="SuckerPassword"></attribute>
-->
+ <!--
+ The number of threads shared by all session enpoints associated with this server peer.
+ Default is 0 - no pool, each session endpoint creates its own queued executor.
+ -->
+ <!--
+ <attribute name="MaximumThreadPoolSize">20</attribute>
+ -->
+
<depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
<depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml 2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml 2008-06-17 01:35:38 UTC (rev 4493)
@@ -222,8 +222,13 @@
<description>The password used for message suckers</description>
<name>SuckerPassword</name>
<type>java.lang.String</type>
- </attribute>
+ </attribute>
+ <attribute access="read-write" getMethod="getMaximumThreadPoolSize" setMethod="setMaximumThreadPoolSize">
+ <description>The maximum number of threads shared by session endpoints. Zero means no pool, each endpoint has its own thread.</description>
+ <name>MaximumThreadPoolSize</name>
+ <type>int</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java 2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java 2008-06-17 01:35:38 UTC (rev 4493)
@@ -161,6 +161,8 @@
//From a system property - this overrides
private boolean strictTckProperty;
+ private int maximumThreadPoolSize;
+
// wired components
private DestinationJNDIMapper destinationJNDIMapper;
@@ -178,6 +180,7 @@
private ClusterNotifier clusterNotifier;
private FailoverWaiter failoverWaiter;
private RotatingID messageIDMgr;
+ private PooledExecutor pooledExecutor;
// plugins
@@ -229,6 +232,11 @@
log.debug(this + " starting");
+ if (maximumThreadPoolSize > 0)
+ {
+ pooledExecutor = new PooledExecutor(maximumThreadPoolSize);
+ }
+
loadClientAOPConfig();
loadServerAOPConfig();
@@ -386,6 +394,11 @@
MessagingTimeoutFactory.instance.reset();
+ if (pooledExecutor != null)
+ {
+ pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ }
+
log.info("JMS " + this + " stopped");
}
catch (Throwable t)
@@ -463,6 +476,53 @@
this.defaultExpiryQueueObjectName = on;
}
+ /**
+ * Set the maximum number of threads to use by the session pooled executor.
+ *
+ * A zero or negative value means 'do not use a pooled executor', which translates into each
+ * session endopoint creating and using its own QueuedExecutor. This is the default configuration
+ * and it is consistent with the behavior of the previous releases. However, this setting may
+ * lead to idle thread build up.
+ *
+ * It is possible to decrease the maximum number of threads at runtime, by calling this method
+ * with a value that is smaller than the current maximum thread pool size. Decreasing the pool
+ * size will not immediately kill existing threads, but they may later die when idle.
+ *
+ * Setting maxium thread pool size to 0 when the server peer was started with a non-zero maximum
+ * number of threads, or incresing the maximum number of threads at runtime after starting the
+ * server peer with a 0 value leads to unspecified behavior.
+ *
+ * See JBMESSAGING-1356.
+ *
+ * @since 1.4.0.SP3.CP3
+ */
+ public synchronized void setMaximumThreadPoolSize(int i)
+ {
+ if (pooledExecutor != null)
+ {
+ pooledExecutor.setMaximumPoolSize(i);
+ }
+ else
+ {
+ this.maximumThreadPoolSize = i;
+ }
+ }
+
+ /**
+ * @see #setMaximumThreadPoolSize
+ */
+ public synchronized int getMaximumThreadPoolSize()
+ {
+ if (pooledExecutor != null)
+ {
+ return pooledExecutor.getMaximumPoolSize();
+ }
+ else
+ {
+ return this.maximumThreadPoolSize;
+ }
+ }
+
// Instance access
public Object getInstance()
@@ -1384,9 +1444,15 @@
return "ServerPeer[" + getServerPeerID() + "]";
}
+ /**
+ *
+ * @return a pooled executor to be shared among this server peer instance's session endpoints.
+ * The method may return null, in which case each session enpoint creates its own
+ * serial executor (this may lead to a large thread number).
+ */
public PooledExecutor getPooledExecutor()
{
- return new PooledExecutor(10);
+ return pooledExecutor;
}
// Package protected ----------------------------------------------------------------------------
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java 2008-06-17 01:35:38 UTC (rev 4493)
@@ -0,0 +1,60 @@
+package org.jboss.test.messaging.jms;
+
+import org.jboss.logging.Logger;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.test.messaging.tools.ServerManagement;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.DeliveryMode;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+/**
+ * Tests the default configuration for a server peer: no pooled executor.
+ *
+ * See http://jira.jboss.com/jira/browse/JBMESSAGING-1356.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class NoPooledExecutorTest extends JMSTestCase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NoPooledExecutorTest.class);
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ public NoPooledExecutorTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------------------------------------
+
+ public void testConfigurationNoPooledExecutor() throws Exception
+ {
+ // test default configuration - no server pool
+ ServerPeer sp = ServerManagement.getServer().getServerPeer();
+ assertEquals(0, sp.getMaximumThreadPoolSize());
+ assertNull(sp.getPooledExecutor());
+ }
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+}
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java 2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java 2008-06-17 01:35:38 UTC (rev 4493)
@@ -1,5 +1,10 @@
package org.jboss.test.messaging.jms;
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.LocalTestServer;
+import org.jboss.jms.server.ServerPeer;
+
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageProducer;
@@ -7,6 +12,18 @@
import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import java.net.URL;
+import java.net.URI;
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.FileReader;
+import java.io.PrintWriter;
+import java.io.FileWriter;
+
/**
* This is a container for all test cases related to
* http://jira.jboss.com/jira/browse/JBMESSAGING-1356: an attempt to minimize the number of threads
@@ -22,10 +39,14 @@
{
// Constants -----------------------------------------------------------------------------------
+ private static final Logger log = Logger.getLogger(PooledThreadsDeliveryTest.class);
+
// Static --------------------------------------------------------------------------------------
// Attributes ----------------------------------------------------------------------------------
+ private int maxPoolSize = 20;
+
// Constructors --------------------------------------------------------------------------------
public PooledThreadsDeliveryTest(String name)
@@ -35,8 +56,19 @@
// Public --------------------------------------------------------------------------------------
+ public void testConfigurationForPooledExecutor() throws Exception
+ {
+ ServerPeer sp = ServerManagement.getServer().getServerPeer();
+ assertEquals(maxPoolSize, sp.getMaximumThreadPoolSize());
+ PooledExecutor pe = sp.getPooledExecutor();
+ assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+ }
+
public void testOneMessage() throws Exception
{
+ PooledExecutor pe = ServerManagement.getServer().getServerPeer().getPooledExecutor();
+ assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+
Connection conn = null;
try
@@ -75,6 +107,9 @@
public void testMessageSequence() throws Exception
{
+ PooledExecutor pe = ServerManagement.getServer().getServerPeer().getPooledExecutor();
+ assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+
Connection conn = null;
try
@@ -129,7 +164,100 @@
// Protected -----------------------------------------------------------------------------------
+ /**
+ * We start the server peer with a non-zero maximum number of threads.
+ */
+ @Override
+ protected void setUp() throws Exception
+ {
+ configureMaximumThreadPoolSize();
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ unconfigureMaximumThreadPoolSize();
+ super.tearDown();
+ }
+
// Private -------------------------------------------------------------------------------------
+ private void configureMaximumThreadPoolSize() throws Exception
+ {
+ // change the attribute definition in a copy of the default configuration file
+ URL url = getClass().getClassLoader().getResource(LocalTestServer.DEFAULT_MAIN_CONFIG_FILE);
+
+ if (url == null)
+ {
+ fail("cannot find " + LocalTestServer.DEFAULT_MAIN_CONFIG_FILE + " in classpath");
+ }
+
+ File orig = new File(url.getFile());
+ File modified = File.createTempFile("server.peer.config-", ".xml");
+ modified.deleteOnExit();
+
+ BufferedReader br = null;
+ FileReader fr = null;
+ PrintWriter pw = null;
+ FileWriter fw = null;
+
+ try
+ {
+ fr = new FileReader(orig);
+ br = new BufferedReader(fr);
+ fw = new FileWriter(modified);
+ pw = new PrintWriter(fw);
+ String line = null;
+ boolean done = false;
+ while((line = br.readLine()) != null)
+ {
+ if (!done && line.indexOf("<depends") != -1)
+ {
+ pw.println(" <attribute name=\"MaximumThreadPoolSize\">" + maxPoolSize +
+ "</attribute>");
+ pw.println();
+ done = true;
+ }
+
+ pw.println(line);
+ }
+ }
+ finally
+ {
+ if (pw != null)
+ {
+ pw.close();
+ }
+
+ if (fw != null)
+ {
+ fw.close();
+ }
+
+ if (br != null)
+ {
+ br.close();
+ }
+
+ if (fr != null)
+ {
+ fr.close();
+ }
+ }
+
+ System.setProperty("test.server.peer.configuration.file", modified.getAbsolutePath());
+ }
+
+ private void unconfigureMaximumThreadPoolSize() throws Exception
+ {
+ String fileName = System.getProperty("test.server.peer.configuration.file");
+ System.clearProperty("test.server.peer.configuration.file");
+
+ // delete the temporary file (even if I configured it to be deleted on exit)
+ File tmp = new File(fileName);
+ assertTrue(tmp.delete());
+ }
+
// Inner classes -------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list