[jbosscache-commits] JBoss Cache SVN: r7130 - in core/trunk/src: main/java/org/jboss/cache/config and 8 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Nov 13 12:50:09 EST 2008
Author: manik.surtani at jboss.com
Date: 2008-11-13 12:50:09 -0500 (Thu, 13 Nov 2008)
New Revision: 7130
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
Modified:
core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml
core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
core/trunk/src/main/resources/config-samples/all.xml
core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd
core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
core/trunk/src/test/resources/configs/parser-test-async.xml
Log:
JBCACHE-1443: Async marshalling, async cache loading and async notification handling use unbounded queues
Modified: core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/docbook/userguide/en/modules/basic_api.xml 2008-11-13 17:50:09 UTC (rev 7130)
@@ -552,7 +552,8 @@
the event. As such, it is good practise to ensure cache listener implementations don't hold up the thread in
long-running tasks. Alternatively, you could set the <literal>CacheListener.sync()</literal> attribute to
<literal>false</literal>, in which case you will not be notified in the caller's thread. See the
- <link linkend="element.listeners">configuration reference</link> on tuning this thread pool.
+ <link linkend="element.listeners">configuration reference</link> on tuning this thread pool and size of blocking
+ queue.
</para>
</section>
</section>
Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2008-11-13 17:50:09 UTC (rev 7130)
@@ -56,7 +56,8 @@
Used to define async listener notification thread pool size
-->
<listeners
- asyncPoolSize="1"/>
+ asyncPoolSize="1"
+ asyncQueueSize="1000000"/>
<!--
Used to enable invocation batching and allow the use of Cache.startBatch()/endBatch() methods.
@@ -93,7 +94,7 @@
<!--
Uncomment this for async replication.
-->
- <!-- <async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" /> -->
+ <!--<async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" serializationExecutorQueueSize="5000000"/>-->
<!-- Uncomment to use Buddy Replication -->
<!--
@@ -988,6 +989,19 @@
treated as synchronous listeners and notified synchronously.
</entry>
</row>
+
+ <row>
+ <entry><emphasis role="bold">asyncQueueSize</emphasis></entry>
+ <entry>listenerAsyncQueueSize</entry>
+ <entry>integer</entry>
+ <entry>1000000</entry>
+
+ <entry>
+ The size of the bounded queue used by the async listener threadpool. Only considered if
+ <literal>asyncPoolSize</literal> is greater than 0. Increase this if you see a lot of threads
+ blocking trying to add events to this queue.
+ </entry>
+ </row>
</tbody>
</tgroup>
</table>
@@ -2763,6 +2777,17 @@
serialization does not happen asynchronously.
</entry>
</row>
+ <row>
+ <entry><emphasis role="bold">serializationExecutorQueueSize</emphasis></entry>
+ <entry>serializationExecutorQueueSize</entry>
+ <entry>positive integer</entry>
+ <entry>1000000</entry>
+ <entry>
+ This is used to define the size of the bounded queue that holds tasks for the serialization executor.
+ This is ignored if a serialization executor is not used, such as when <literal>serializationExecutorPoolSize</literal>
+ is less than 1.
+ </entry>
+ </row>
<row>
<entry><emphasis role="bold">useReplQueue</emphasis></entry>
<entry>useReplQueue</entry>
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -224,7 +224,9 @@
private boolean writeSkewCheck = false;
private int concurrencyLevel = 500;
private int listenerAsyncPoolSize = 1;
- private int serializationExecutorPoolSize = 25;
+ private int listenerAsyncQueueSize = 1000000;
+ private int serializationExecutorPoolSize = 0;
+ private int serializationExecutorQueueSize = 1000000;
@Start(priority = 1)
void correctIsolationLevels()
@@ -455,10 +457,32 @@
*/
public void setListenerAsyncPoolSize(int listenerAsyncPoolSize)
{
- testImmutability("asyncListenerPoolSize");
+ testImmutability("listenerAsyncPoolSize");
this.listenerAsyncPoolSize = listenerAsyncPoolSize;
}
+ /**
+ * Sets the queue size of the bounded queue used to store async listener events on. This defaults to 1,000,000.
+ *
+ * @param listenerAsyncQueueSize queue size to use
+ */
+ public void setListenerAsyncQueueSize(int listenerAsyncQueueSize)
+ {
+ testImmutability("listenerAsyncQueueSize");
+ this.listenerAsyncQueueSize = listenerAsyncQueueSize;
+ }
+
+ /**
+ * Sets the queue size of the bounded queue used to store async serialization events on. This defaults to 1,000,000.
+ *
+ * @param serializationExecutorQueueSize queue size to use
+ */
+ public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize)
+ {
+ testImmutability("serializationExecutorQueueSize");
+ this.serializationExecutorQueueSize = serializationExecutorQueueSize;
+ }
+
public void setBuddyReplicationConfig(BuddyReplicationConfig config)
{
testImmutability("buddyReplicationConfig");
@@ -921,6 +945,24 @@
return serializationExecutorPoolSize;
}
+ /**
+ *
+ * @return the bounded queue size for async listeners
+ */
+ public int getListenerAsyncQueueSize()
+ {
+ return listenerAsyncQueueSize;
+ }
+
+ /**
+ *
+ * @return the bounded queue size for async serializers
+ */
+ public int getSerializationExecutorQueueSize()
+ {
+ return serializationExecutorQueueSize;
+ }
+
// ------------------------------------------------------------------------------------------------------------
// HELPERS
// ------------------------------------------------------------------------------------------------------------
@@ -977,6 +1019,8 @@
if (listenerAsyncPoolSize != that.listenerAsyncPoolSize) return false;
if (serializationExecutorPoolSize != that.serializationExecutorPoolSize) return false;
if (jgroupsConfigFile != that.jgroupsConfigFile) return false;
+ if (listenerAsyncQueueSize != that.listenerAsyncQueueSize) return false;
+ if (serializationExecutorQueueSize != that.serializationExecutorQueueSize) return false;
return true;
}
@@ -1018,6 +1062,9 @@
result = 31 * result + objectInputStreamPoolSize;
result = 31 * result + objectOutputStreamPoolSize;
result = 31 * result + serializationExecutorPoolSize;
+ result = 31 * result + listenerAsyncPoolSize;
+ result = 31 * result + serializationExecutorQueueSize;
+ result = 31 * result + listenerAsyncQueueSize;
result = 31 * result + (jgroupsConfigFile != null ? jgroupsConfigFile.hashCode() : 0);
return result;
}
Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -257,17 +257,10 @@
{
if (element == null) return; //this element is optional
String asyncPoolSizeStr = getAttributeValue(element, "asyncPoolSize");
- if (existsAttribute(asyncPoolSizeStr))
- {
- try
- {
- config.setListenerAsyncPoolSize(Integer.parseInt(asyncPoolSizeStr));
- }
- catch (NumberFormatException nfe)
- {
- throw new ConfigurationException("Unable to parse the asyncPoolSize attribute of the listeners element. Was [" + asyncPoolSizeStr + "]");
- }
- }
+ if (existsAttribute(asyncPoolSizeStr)) config.setListenerAsyncPoolSize(getInt(asyncPoolSizeStr));
+
+ String asyncQueueSizeStr = getAttributeValue(element, "asyncQueueSize");
+ if (existsAttribute(asyncQueueSizeStr)) config.setListenerAsyncQueueSize(getInt(asyncQueueSizeStr));
}
private void configureInvocationBatching(Element element)
@@ -380,10 +373,15 @@
String replQueueInterval = getAttributeValue(element, "replQueueInterval");
if (existsAttribute(replQueueInterval)) config.setReplQueueInterval(getLong(replQueueInterval));
String replQueueMaxElements = getAttributeValue(element, "replQueueMaxElements");
+
if (existsAttribute(replQueueMaxElements)) config.setReplQueueMaxElements(getInt(replQueueMaxElements));
String serializationExecutorPoolSize = getAttributeValue(element, "serializationExecutorPoolSize");
if (existsAttribute(serializationExecutorPoolSize))
config.setSerializationExecutorPoolSize(getInt(serializationExecutorPoolSize));
+
+ String serializationExecutorQueueSize = getAttributeValue(element, "serializationExecutorQueueSize");
+ if (existsAttribute(serializationExecutorQueueSize))
+ config.setSerializationExecutorQueueSize(getInt(serializationExecutorQueueSize));
}
private void configureLocking(Element element)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -31,6 +31,7 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.Address;
import org.jgroups.Channel;
@@ -48,7 +49,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -100,14 +100,15 @@
if (replicationProcessor == null)
{
replicationProcessorCount = new AtomicInteger(0);
- replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+
+ replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
new ThreadFactory()
{
public Thread newThread(Runnable r)
{
return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
}
- }
+ }, c.getSerializationExecutorQueueSize()
);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -40,6 +40,7 @@
import org.jboss.cache.notifications.event.*;
import static org.jboss.cache.notifications.event.Event.Type.*;
import org.jboss.cache.util.Immutables;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.View;
@@ -56,7 +57,6 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -169,13 +169,13 @@
// create one if needed
if (config.getListenerAsyncPoolSize() > 0)
{
- asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+ asyncProcessor = BoundedExecutors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
{
public Thread newThread(Runnable r)
{
return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
}
- });
+ }, config.getListenerAsyncQueueSize());
}
else
{
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Similar to JDK {@link java.util.concurrent.Executors} except that the factory methods here allow you to specify the
+ * size of the blocking queue that backs the executor.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @since 3.0
+ */
+public class BoundedExecutors
+{
+ /**
+ * Creates a thread pool that reuses a fixed set of threads
+ * operating off a shared bounded queue. If any thread
+ * terminates due to a failure during execution prior to shutdown,
+ * a new one will take its place if needed to execute subsequent
+ * tasks.
+ *
+ * @param nThreads the number of threads in the pool
+ * @param boundedQueueSize size of the bounded queue
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newFixedThreadPool(int nThreads, int boundedQueueSize)
+ {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(boundedQueueSize));
+ }
+
+ /**
+ * Creates a thread pool that reuses a fixed set of threads
+ * operating off a shared bounded queue, using the provided
+ * ThreadFactory to create new threads when needed.
+ *
+ * @param nThreads the number of threads in the pool
+ * @param threadFactory the factory to use when creating new threads
+ * @param boundedQueueSize size of the bounded queue
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, int boundedQueueSize)
+ {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(boundedQueueSize),
+ threadFactory);
+ }
+}
Modified: core/trunk/src/main/resources/config-samples/all.xml
===================================================================
--- core/trunk/src/main/resources/config-samples/all.xml 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/resources/config-samples/all.xml 2008-11-13 17:50:09 UTC (rev 7130)
@@ -46,7 +46,8 @@
Used to define async listener notification thread pool size
-->
<listeners
- asyncPoolSize="1"/>
+ asyncPoolSize="1"
+ asyncQueueSize="1000000"/>
<!--
Used to enable invocation batching and allow the use of Cache.startBatch()/endBatch() methods.
@@ -83,7 +84,7 @@
<!--
Uncomment this for async replication.
-->
- <!-- <async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" /> -->
+ <!--<async useReplQueue="true" replQueueInterval="10000" replQueueMaxElements="500" serializationExecutorPoolSize="20" serializationExecutorQueueSize="5000000"/>-->
<!-- Uncomment to use Buddy Replication -->
<!--
Modified: core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd
===================================================================
--- core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/main/resources/schema/jbosscache-config-3.0.xsd 2008-11-13 17:50:09 UTC (rev 7130)
@@ -142,6 +142,7 @@
<xs:complexType name="listenersType">
<xs:attribute name="asyncPoolSize" type="tns:positiveInteger"/>
+ <xs:attribute name="asyncQueueSize" type="tns:positiveInteger"/>
</xs:complexType>
<xs:complexType name="invocationBatchingType">
@@ -165,6 +166,7 @@
<xs:attribute name="replQueueInterval" type="tns:positiveInteger"/>
<xs:attribute name="replQueueMaxElements" type="tns:positiveInteger"/>
<xs:attribute name="serializationExecutorPoolSize" type="tns:positiveInteger"/>
+ <xs:attribute name="serializationExecutorQueueSize" type="tns:positiveInteger"/>
</xs:complexType>
<xs:complexType name="evictionType">
Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-11-13 17:50:09 UTC (rev 7130)
@@ -74,6 +74,7 @@
public void testAsyncSerializationExecutorSize()
{
assert asyncConfig.getSerializationExecutorPoolSize() == 250;
+ assert asyncConfig.getSerializationExecutorQueueSize() == 5000000;
}
public void testUseReplQueue()
@@ -284,6 +285,10 @@
public void testListenerAsyncThreads()
{
assert syncConfig.getListenerAsyncPoolSize() == 5;
+ assert syncConfig.getListenerAsyncQueueSize() == 1000000; // the default
+
+ assert asyncConfig.getListenerAsyncPoolSize() == 5;
+ assert asyncConfig.getListenerAsyncQueueSize() == 100000;
}
public void testInvocationBatching()
Modified: core/trunk/src/test/resources/configs/parser-test-async.xml
===================================================================
--- core/trunk/src/test/resources/configs/parser-test-async.xml 2008-11-13 05:21:36 UTC (rev 7129)
+++ core/trunk/src/test/resources/configs/parser-test-async.xml 2008-11-13 17:50:09 UTC (rev 7130)
@@ -16,7 +16,7 @@
<startup regionsInactiveOnStartup="true"/>
<shutdown hookBehavior="REGISTER"/>
- <listeners asyncPoolSize="5"/>
+ <listeners asyncPoolSize="5" asyncQueueSize="100000"/>
<invocationBatching enabled="true"/>
<!-- serialization related configuration, used for replication and cache loading -->
@@ -24,7 +24,7 @@
marshallerClass="some.Clazz" useLazyDeserialization="true" useRegionBasedMarshalling="true"/>
<clustering mode="replication" clusterName="JBossCache-cluster">
- <async useReplQueue="false" serializationExecutorPoolSize="250"/>
+ <async useReplQueue="false" serializationExecutorPoolSize="250" serializationExecutorQueueSize="5000000"/>
<stateRetrieval timeout="15124" fetchInMemoryState="true"/>
<buddy enabled="true" poolName="myBuddyPoolReplicationGroup" communicationTimeout="2000">
<dataGravitation auto="true" removeOnFind="true" searchBackupTrees="true"/>
More information about the jbosscache-commits
mailing list