[jboss-cvs] JBoss Messaging SVN: r4493 - in branches/Branch_Experimental_JBMESSAGING_1356_2: src/etc/xmdesc and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 16 21:35:39 EDT 2008


Author: ovidiu.feodorov at jboss.com
Date: 2008-06-16 21:35:38 -0400 (Mon, 16 Jun 2008)
New Revision: 4493

Added:
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java
Modified:
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
Log:
Modified ServerPeer to accept a MaximumThreadPoolSize JMX attribute. If not specified, or 0, the behavior is consistent with the behavior of the previous versions: a QueuedExecutor per session endpoint

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml	2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/server/default/deploy/messaging-service.xml	2008-06-17 01:35:38 UTC (rev 4493)
@@ -99,6 +99,14 @@
       <attribute name="SuckerPassword"></attribute>
       -->
 
+       <!--
+           The number of threads shared by all session enpoints associated with this server peer.
+           Default is 0 - no pool, each session endpoint creates its own queued executor.
+       -->
+       <!--
+       <attribute name="MaximumThreadPoolSize">20</attribute>
+       -->
+
       <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
       
       <depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml	2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/etc/xmdesc/ServerPeer-xmbean.xml	2008-06-17 01:35:38 UTC (rev 4493)
@@ -222,8 +222,13 @@
       <description>The password used for message suckers</description>
       <name>SuckerPassword</name>
       <type>java.lang.String</type>
-   </attribute>   
+   </attribute>
 
+   <attribute access="read-write" getMethod="getMaximumThreadPoolSize" setMethod="setMaximumThreadPoolSize">
+      <description>The maximum number of threads shared by session endpoints. Zero means no pool, each endpoint has its own thread.</description>
+      <name>MaximumThreadPoolSize</name>
+      <type>int</type>
+   </attribute>
 
    <!-- Managed operations -->
 

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java	2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java	2008-06-17 01:35:38 UTC (rev 4493)
@@ -161,6 +161,8 @@
    //From a system property - this overrides
    private boolean strictTckProperty;
 
+   private int maximumThreadPoolSize;
+
    // wired components
 
    private DestinationJNDIMapper destinationJNDIMapper;
@@ -178,6 +180,7 @@
    private ClusterNotifier clusterNotifier;
    private FailoverWaiter failoverWaiter;
    private RotatingID messageIDMgr;
+   private PooledExecutor pooledExecutor;
 
    // plugins
 
@@ -229,6 +232,11 @@
 
          log.debug(this + " starting");
 
+         if (maximumThreadPoolSize > 0)
+         {
+             pooledExecutor = new PooledExecutor(maximumThreadPoolSize);
+         }
+
          loadClientAOPConfig();
 
          loadServerAOPConfig();
@@ -386,6 +394,11 @@
 
          MessagingTimeoutFactory.instance.reset();
 
+         if (pooledExecutor != null)
+         {
+            pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+         }
+
          log.info("JMS " + this + " stopped");
       }
       catch (Throwable t)
@@ -463,6 +476,53 @@
       this.defaultExpiryQueueObjectName = on;
    }
 
+   /**
+    * Set the maximum number of threads to use by the session pooled executor.
+    *
+    * A zero or negative value means 'do not use a pooled executor', which translates into each
+    * session endopoint creating and using its own QueuedExecutor. This is the default configuration
+    * and it is consistent with the behavior of the previous releases. However, this setting may
+    * lead to idle thread build up.
+    *
+    * It is possible to decrease the maximum number of threads at runtime, by calling this method
+    * with a value that is smaller than the current maximum thread pool size. Decreasing the pool
+    * size will not immediately kill existing threads, but they may later die when idle.
+    *
+    * Setting maxium thread pool size to 0 when the server peer was started with a non-zero maximum
+    * number of threads, or incresing the maximum number of threads at runtime after starting the
+    * server peer with a 0 value leads to unspecified behavior.
+    *
+    * See JBMESSAGING-1356.
+    *
+    * @since 1.4.0.SP3.CP3
+    */
+   public synchronized void setMaximumThreadPoolSize(int i)
+   {
+       if (pooledExecutor != null)
+       {
+           pooledExecutor.setMaximumPoolSize(i);
+       }
+       else
+       {
+           this.maximumThreadPoolSize = i;
+       }
+   }
+
+   /**
+    * @see #setMaximumThreadPoolSize
+    */
+   public synchronized int getMaximumThreadPoolSize()
+   {
+       if (pooledExecutor != null)
+       {
+           return pooledExecutor.getMaximumPoolSize();
+       }
+       else
+       {
+           return this.maximumThreadPoolSize;
+       }
+   }
+
    // Instance access
 
    public Object getInstance()
@@ -1384,9 +1444,15 @@
       return "ServerPeer[" + getServerPeerID() + "]";
    }
 
+    /**
+     *
+     * @return a pooled executor to be shared among this server peer instance's session endpoints.
+     *         The method may return null, in which case each session enpoint creates its own
+     *         serial executor (this may lead to a large thread number).
+     */
     public PooledExecutor getPooledExecutor()
     {
-        return new PooledExecutor(10);
+        return pooledExecutor;
     }
 
    // Package protected ----------------------------------------------------------------------------

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/NoPooledExecutorTest.java	2008-06-17 01:35:38 UTC (rev 4493)
@@ -0,0 +1,60 @@
+package org.jboss.test.messaging.jms;
+
+import org.jboss.logging.Logger;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.test.messaging.tools.ServerManagement;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.DeliveryMode;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+/**
+ * Tests the default configuration for a server peer: no pooled executor.
+ *
+ * See http://jira.jboss.com/jira/browse/JBMESSAGING-1356.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class NoPooledExecutorTest extends JMSTestCase
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    private static final Logger log = Logger.getLogger(NoPooledExecutorTest.class);
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    public NoPooledExecutorTest(String name)
+    {
+       super(name);
+    }
+
+    // Public --------------------------------------------------------------------------------------
+
+    public void testConfigurationNoPooledExecutor() throws Exception
+    {
+        // test default configuration - no server pool
+        ServerPeer sp = ServerManagement.getServer().getServerPeer();
+        assertEquals(0, sp.getMaximumThreadPoolSize());
+        assertNull(sp.getPooledExecutor());
+    }
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+}

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java	2008-06-17 01:28:31 UTC (rev 4492)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java	2008-06-17 01:35:38 UTC (rev 4493)
@@ -1,5 +1,10 @@
 package org.jboss.test.messaging.jms;
 
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.LocalTestServer;
+import org.jboss.jms.server.ServerPeer;
+
 import javax.jms.Connection;
 import javax.jms.Session;
 import javax.jms.MessageProducer;
@@ -7,6 +12,18 @@
 import javax.jms.TextMessage;
 import javax.jms.MessageConsumer;
 
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import java.net.URL;
+import java.net.URI;
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.FileReader;
+import java.io.PrintWriter;
+import java.io.FileWriter;
+
 /**
  * This is a container for all test cases related to
  * http://jira.jboss.com/jira/browse/JBMESSAGING-1356: an attempt to minimize the number of threads 
@@ -22,10 +39,14 @@
 {
     // Constants -----------------------------------------------------------------------------------
 
+    private static final Logger log = Logger.getLogger(PooledThreadsDeliveryTest.class);
+
     // Static --------------------------------------------------------------------------------------
 
     // Attributes ----------------------------------------------------------------------------------
 
+    private int maxPoolSize = 20;
+
     // Constructors --------------------------------------------------------------------------------
 
     public PooledThreadsDeliveryTest(String name)
@@ -35,8 +56,19 @@
 
     // Public --------------------------------------------------------------------------------------
 
+    public void testConfigurationForPooledExecutor() throws Exception
+    {
+        ServerPeer sp = ServerManagement.getServer().getServerPeer();
+        assertEquals(maxPoolSize, sp.getMaximumThreadPoolSize());
+        PooledExecutor pe = sp.getPooledExecutor();
+        assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+    }
+
     public void testOneMessage() throws Exception
     {
+        PooledExecutor pe = ServerManagement.getServer().getServerPeer().getPooledExecutor();
+        assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+
         Connection conn = null;
 
         try
@@ -75,6 +107,9 @@
 
     public void testMessageSequence() throws Exception
     {
+        PooledExecutor pe = ServerManagement.getServer().getServerPeer().getPooledExecutor();
+        assertEquals(maxPoolSize, pe.getMaximumPoolSize());
+
         Connection conn = null;
 
         try
@@ -129,7 +164,100 @@
 
     // Protected -----------------------------------------------------------------------------------
 
+    /**
+     * We start the server peer with a non-zero maximum number of threads.
+     */
+    @Override
+    protected void setUp() throws Exception
+    {
+        configureMaximumThreadPoolSize();
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        unconfigureMaximumThreadPoolSize();
+        super.tearDown();
+    }
+
     // Private -------------------------------------------------------------------------------------
 
+    private void configureMaximumThreadPoolSize() throws Exception
+    {
+        // change the attribute definition in a copy of the default configuration file
+        URL url = getClass().getClassLoader().getResource(LocalTestServer.DEFAULT_MAIN_CONFIG_FILE);
+
+        if (url == null)
+        {
+            fail("cannot find " + LocalTestServer.DEFAULT_MAIN_CONFIG_FILE + " in classpath");
+        }
+
+        File orig = new File(url.getFile());
+        File modified = File.createTempFile("server.peer.config-", ".xml");
+        modified.deleteOnExit();
+
+        BufferedReader br = null;
+        FileReader fr = null;
+        PrintWriter pw = null;
+        FileWriter fw = null;
+
+        try
+        {
+            fr = new FileReader(orig);
+            br = new BufferedReader(fr);
+            fw = new FileWriter(modified);
+            pw = new PrintWriter(fw);
+            String line = null;
+            boolean done = false;
+            while((line = br.readLine()) != null)
+            {
+                if (!done && line.indexOf("<depends") != -1)
+                {
+                    pw.println("    <attribute name=\"MaximumThreadPoolSize\">" + maxPoolSize +
+                               "</attribute>");
+                    pw.println();
+                    done = true;
+                }
+
+                pw.println(line);
+            }
+        }
+        finally
+        {
+            if (pw != null)
+            {
+                pw.close();
+            }
+
+            if (fw != null)
+            {
+                fw.close();
+            }
+
+            if (br != null)
+            {
+                br.close();
+            }
+
+            if (fr != null)
+            {
+                fr.close();
+            }
+        }
+
+        System.setProperty("test.server.peer.configuration.file", modified.getAbsolutePath());
+    }
+
+    private void unconfigureMaximumThreadPoolSize() throws Exception
+    {
+        String fileName = System.getProperty("test.server.peer.configuration.file");
+        System.clearProperty("test.server.peer.configuration.file");
+
+        // delete the temporary file (even if I configured it to be deleted on exit)
+        File tmp = new File(fileName);
+        assertTrue(tmp.delete());
+    }
+
     // Inner classes -------------------------------------------------------------------------------
 }




More information about the jboss-cvs-commits mailing list