[jbosscache-commits] JBoss Cache SVN: r7130 - in core/trunk/src: main/java/org/jboss/cache/config and 8 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Nov 13 12:50:09 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-11-13 12:50:09 -0500 (Thu, 13 Nov 2008)
New Revision: 7130

Added:
   core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
Modified:
   core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml
   core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
   core/trunk/src/main/resources/config-samples/all.xml
   core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd
   core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
   core/trunk/src/test/resources/configs/parser-test-async.xml
Log:
JBCACHE-1443: Async marshalling, async cache loading and async notification handling use unbounded queues

Modified: core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml	2008-11-13 17:50:09 UTC (rev 7130)
@@ -552,7 +552,8 @@
             the event.  As such, it is good practise to ensure cache listener implementations don't hold up the thread in
             long-running tasks.  Alternatively, you could set the <literal>CacheListener.sync()</literal> attribute to
             <literal>false</literal>, in which case you will not be notified in the caller's thread.  See the
-            <link linkend="element.listeners">configuration reference</link> on tuning this thread pool.
+            <link linkend="element.listeners">configuration reference</link> on tuning this thread pool and size of blocking
+             queue.
          </para>
       </section>
    </section>

Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml	2008-11-13 17:50:09 UTC (rev 7130)
@@ -56,7 +56,8 @@
       Used to define async listener notification thread pool size
    -->
    <listeners
-         asyncPoolSize="1"/>
+         asyncPoolSize="1"
+         asyncQueueSize="1000000"/>
 
    <!--
       Used to enable invocation batching and allow the use of Cache.startBatch()/endBatch() methods.
@@ -93,7 +94,7 @@
       <!--
          Uncomment this for async replication.
       -->
-      <!-- <async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" /> -->
+      <!--<async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" serializationExecutorQueueSize="5000000"/>-->
 
       <!-- Uncomment to use Buddy Replication -->
       <!--
@@ -988,6 +989,19 @@
                      treated as synchronous listeners and notified synchronously.
                   </entry>
                </row>
+
+                <row>
+                  <entry><emphasis role="bold">asyncQueueSize</emphasis></entry>
+                  <entry>listenerAsyncQueueSize</entry>
+                  <entry>integer</entry>
+                  <entry>1000000</entry>
+
+                  <entry>
+                     The size of the bounded queue used by the async listener threadpool.  Only considered if
+                     <literal>asyncPoolSize</literal> is greater than 0.  Increase this if you see a lot of threads
+                     blocking trying to add events to this queue.
+                  </entry>
+               </row>
             </tbody>
          </tgroup>
       </table>
@@ -2763,6 +2777,17 @@
                      serialization does not happen asynchronously.
                   </entry>
                </row>
+                <row>
+                  <entry><emphasis role="bold">serializationExecutorQueueSize</emphasis></entry>
+                  <entry>serializationExecutorQueueSize</entry>
+                  <entry>positive integer</entry>
+                  <entry>1000000</entry>
+                  <entry>
+                     This is used to define the size of the bounded queue that holds tasks for the serialization executor.
+                     This is ignored if a serialization executor is not used, such as when <literal>serializationExecutorPoolSize</literal>
+                     is less than 1.
+                  </entry>
+               </row>
                <row>
                   <entry><emphasis role="bold">useReplQueue</emphasis></entry>
                   <entry>useReplQueue</entry>

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -224,7 +224,9 @@
    private boolean writeSkewCheck = false;
    private int concurrencyLevel = 500;
    private int listenerAsyncPoolSize = 1;
-   private int serializationExecutorPoolSize = 25;
+   private int listenerAsyncQueueSize = 1000000;
+   private int serializationExecutorPoolSize = 0;
+   private int serializationExecutorQueueSize = 1000000;
 
    @Start(priority = 1)
    void correctIsolationLevels()
@@ -455,10 +457,32 @@
     */
    public void setListenerAsyncPoolSize(int listenerAsyncPoolSize)
    {
-      testImmutability("asyncListenerPoolSize");
+      testImmutability("listenerAsyncPoolSize");
       this.listenerAsyncPoolSize = listenerAsyncPoolSize;
    }
 
+   /**
+    * Sets the queue size of the bounded queue used to store async listener events on.  This defaults to 1,000,000.
+    *
+    * @param listenerAsyncQueueSize queue size to use
+    */
+   public void setListenerAsyncQueueSize(int listenerAsyncQueueSize)
+   {
+      testImmutability("listenerAsyncQueueSize");
+      this.listenerAsyncQueueSize = listenerAsyncQueueSize;
+   }
+
+   /**
+    * Sets the queue size of the bounded queue used to store async serialization events on.  This defaults to 1,000,000.
+    * 
+    * @param serializationExecutorQueueSize queue size to use
+    */
+   public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize)
+   {
+      testImmutability("serializationExecutorQueueSize");
+      this.serializationExecutorQueueSize = serializationExecutorQueueSize;
+   }
+
    public void setBuddyReplicationConfig(BuddyReplicationConfig config)
    {
       testImmutability("buddyReplicationConfig");
@@ -921,6 +945,24 @@
       return serializationExecutorPoolSize;
    }
 
+   /**
+    *
+    * @return the bounded queue size for async listeners
+    */
+   public int getListenerAsyncQueueSize()
+   {
+      return listenerAsyncQueueSize;
+   }
+
+   /**
+    *
+    * @return the bounded queue size for async serializers
+    */
+   public int getSerializationExecutorQueueSize()
+   {
+      return serializationExecutorQueueSize;
+   }
+
    // ------------------------------------------------------------------------------------------------------------
    //   HELPERS
    // ------------------------------------------------------------------------------------------------------------
@@ -977,6 +1019,8 @@
       if (listenerAsyncPoolSize != that.listenerAsyncPoolSize) return false;
       if (serializationExecutorPoolSize != that.serializationExecutorPoolSize) return false;
       if (jgroupsConfigFile != that.jgroupsConfigFile) return false;
+      if (listenerAsyncQueueSize != that.listenerAsyncQueueSize) return false;
+      if (serializationExecutorQueueSize != that.serializationExecutorQueueSize) return false;
 
       return true;
    }
@@ -1018,6 +1062,9 @@
       result = 31 * result + objectInputStreamPoolSize;
       result = 31 * result + objectOutputStreamPoolSize;
       result = 31 * result + serializationExecutorPoolSize;
+      result = 31 * result + listenerAsyncPoolSize;
+      result = 31 * result + serializationExecutorQueueSize;
+      result = 31 * result + listenerAsyncQueueSize;
       result = 31 * result + (jgroupsConfigFile != null ? jgroupsConfigFile.hashCode() : 0);
       return result;
    }

Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -257,17 +257,10 @@
    {
       if (element == null) return; //this element is optional
       String asyncPoolSizeStr = getAttributeValue(element, "asyncPoolSize");
-      if (existsAttribute(asyncPoolSizeStr))
-      {
-         try
-         {
-            config.setListenerAsyncPoolSize(Integer.parseInt(asyncPoolSizeStr));
-         }
-         catch (NumberFormatException nfe)
-         {
-            throw new ConfigurationException("Unable to parse the asyncPoolSize attribute of the listeners element.  Was [" + asyncPoolSizeStr + "]");
-         }
-      }
+      if (existsAttribute(asyncPoolSizeStr)) config.setListenerAsyncPoolSize(getInt(asyncPoolSizeStr));
+
+      String asyncQueueSizeStr = getAttributeValue(element, "asyncQueueSize");
+      if (existsAttribute(asyncQueueSizeStr)) config.setListenerAsyncQueueSize(getInt(asyncQueueSizeStr));      
    }
 
    private void configureInvocationBatching(Element element)
@@ -380,10 +373,15 @@
       String replQueueInterval = getAttributeValue(element, "replQueueInterval");
       if (existsAttribute(replQueueInterval)) config.setReplQueueInterval(getLong(replQueueInterval));
       String replQueueMaxElements = getAttributeValue(element, "replQueueMaxElements");
+
       if (existsAttribute(replQueueMaxElements)) config.setReplQueueMaxElements(getInt(replQueueMaxElements));
       String serializationExecutorPoolSize = getAttributeValue(element, "serializationExecutorPoolSize");
       if (existsAttribute(serializationExecutorPoolSize))
          config.setSerializationExecutorPoolSize(getInt(serializationExecutorPoolSize));
+
+      String serializationExecutorQueueSize = getAttributeValue(element, "serializationExecutorQueueSize");
+      if (existsAttribute(serializationExecutorQueueSize))
+         config.setSerializationExecutorQueueSize(getInt(serializationExecutorQueueSize));
    }
 
    private void configureLocking(Element element)

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -31,6 +31,7 @@
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.interceptors.InterceptorChain;
 import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
 import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jgroups.Address;
 import org.jgroups.Channel;
@@ -48,7 +49,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -100,14 +100,15 @@
          if (replicationProcessor == null)
          {
             replicationProcessorCount = new AtomicInteger(0);
-            replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+
+            replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
                   new ThreadFactory()
                   {
                      public Thread newThread(Runnable r)
                      {
                         return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
                      }
-                  }
+                  }, c.getSerializationExecutorQueueSize()
             );
          }
       }

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -40,6 +40,7 @@
 import org.jboss.cache.notifications.event.*;
 import static org.jboss.cache.notifications.event.Event.Type.*;
 import org.jboss.cache.util.Immutables;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
 import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jgroups.View;
 
@@ -56,7 +57,6 @@
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -169,13 +169,13 @@
          // create one if needed
          if (config.getListenerAsyncPoolSize() > 0)
          {
-            asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+            asyncProcessor = BoundedExecutors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
             {
                public Thread newThread(Runnable r)
                {
                   return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
                }
-            });
+            }, config.getListenerAsyncQueueSize());
          }
          else
          {

Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Similar to JDK {@link java.util.concurrent.Executors} except that the factory methods here allow you to specify the
+ * size of the blocking queue that backs the executor.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @since 3.0
+ */
+public class BoundedExecutors
+{
+   /**
+    * Creates a thread pool that reuses a fixed set of threads
+    * operating off a shared bounded queue. If any thread
+    * terminates due to a failure during execution prior to shutdown,
+    * a new one will take its place if needed to execute subsequent
+    * tasks.
+    *
+    * @param nThreads         the number of threads in the pool
+    * @param boundedQueueSize size of the bounded queue
+    * @return the newly created thread pool
+    */
+   public static ExecutorService newFixedThreadPool(int nThreads, int boundedQueueSize)
+   {
+      return new ThreadPoolExecutor(nThreads, nThreads,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(boundedQueueSize));
+   }
+
+   /**
+    * Creates a thread pool that reuses a fixed set of threads
+    * operating off a shared bounded queue, using the provided
+    * ThreadFactory to create new threads when needed.
+    *
+    * @param nThreads         the number of threads in the pool
+    * @param threadFactory    the factory to use when creating new threads
+    * @param boundedQueueSize size of the bounded queue
+    * @return the newly created thread pool
+    */
+   public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, int boundedQueueSize)
+   {
+      return new ThreadPoolExecutor(nThreads, nThreads,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(boundedQueueSize),
+            threadFactory);
+   }
+}

Modified: core/trunk/src/main/resources/config-samples/all.xml
===================================================================
--- core/trunk/src/main/resources/config-samples/all.xml	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/resources/config-samples/all.xml	2008-11-13 17:50:09 UTC (rev 7130)
@@ -46,7 +46,8 @@
       Used to define async listener notification thread pool size
    -->
    <listeners
-         asyncPoolSize="1"/>
+         asyncPoolSize="1"
+         asyncQueueSize="1000000"/>
 
    <!--
       Used to enable invocation batching and allow the use of Cache.startBatch()/endBatch() methods.
@@ -83,7 +84,7 @@
       <!--
          Uncomment this for async replication.
       -->
-      <!-- <async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" /> -->
+      <!--<async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" serializationExecutorQueueSize="5000000"/>-->
 
       <!-- Uncomment to use Buddy Replication -->
       <!--

Modified: core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd
===================================================================
--- core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd	2008-11-13 17:50:09 UTC (rev 7130)
@@ -142,6 +142,7 @@
 
    <xs:complexType name="listenersType">
       <xs:attribute name="asyncPoolSize" type="tns:positiveInteger"/>
+      <xs:attribute name="asyncQueueSize" type="tns:positiveInteger"/>
    </xs:complexType>
 
    <xs:complexType name="invocationBatchingType">
@@ -165,6 +166,7 @@
       <xs:attribute name="replQueueInterval" type="tns:positiveInteger"/>
       <xs:attribute name="replQueueMaxElements" type="tns:positiveInteger"/>
       <xs:attribute name="serializationExecutorPoolSize" type="tns:positiveInteger"/>
+      <xs:attribute name="serializationExecutorQueueSize" type="tns:positiveInteger"/>
    </xs:complexType>
 
    <xs:complexType name="evictionType">

Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java	2008-11-13 17:50:09 UTC (rev 7130)
@@ -74,6 +74,7 @@
    public void testAsyncSerializationExecutorSize()
    {
       assert asyncConfig.getSerializationExecutorPoolSize() == 250;
+      assert asyncConfig.getSerializationExecutorQueueSize() == 5000000;
    }
 
    public void testUseReplQueue()
@@ -284,6 +285,10 @@
    public void testListenerAsyncThreads()
    {
       assert syncConfig.getListenerAsyncPoolSize() == 5;
+      assert syncConfig.getListenerAsyncQueueSize() == 1000000; // the default
+
+      assert asyncConfig.getListenerAsyncPoolSize() == 5;
+      assert asyncConfig.getListenerAsyncQueueSize() == 100000;
    }
 
    public void testInvocationBatching()

Modified: core/trunk/src/test/resources/configs/parser-test-async.xml
===================================================================
--- core/trunk/src/test/resources/configs/parser-test-async.xml	2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/test/resources/configs/parser-test-async.xml	2008-11-13 17:50:09 UTC (rev 7130)
@@ -16,7 +16,7 @@
 
    <startup regionsInactiveOnStartup="true"/>
    <shutdown hookBehavior="REGISTER"/>
-   <listeners asyncPoolSize="5"/>
+   <listeners asyncPoolSize="5" asyncQueueSize="100000"/>
    <invocationBatching enabled="true"/>
 
    <!-- serialization related configuration, used for replication and cache loading -->
@@ -24,7 +24,7 @@
                   marshallerClass="some.Clazz" useLazyDeserialization="true" useRegionBasedMarshalling="true"/>
 
    <clustering mode="replication" clusterName="JBossCache-cluster">
-      <async useReplQueue="false" serializationExecutorPoolSize="250"/>
+      <async useReplQueue="false" serializationExecutorPoolSize="250" serializationExecutorQueueSize="5000000"/>
       <stateRetrieval timeout="15124" fetchInMemoryState="true"/>
       <buddy enabled="true" poolName="myBuddyPoolReplicationGroup" communicationTimeout="2000">
          <dataGravitation auto="true" removeOnFind="true" searchBackupTrees="true"/>




More information about the jbosscache-commits mailing list