[jbosscache-commits] JBoss Cache SVN: r7131 - in core/branches/flat/src/main/java/org/jboss: starobrno/config/parsing and 2 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Nov 13 13:04:14 EST 2008
Author: manik.surtani at jboss.com
Date: 2008-11-13 13:04:13 -0500 (Thu, 13 Nov 2008)
New Revision: 7131
Added:
core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
Log:
JBCACHE-1443: Async marshalling, async cache loading and async notification handling use unbounded queues
Added: core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java 2008-11-13 18:04:13 UTC (rev 7131)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Similar to JDK {@link java.util.concurrent.Executors} except that the factory methods here allow you to specify the
+ * size of the blocking queue that backs the executor.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @since 3.0
+ */
+public class BoundedExecutors
+{
+ /**
+ * Creates a thread pool that reuses a fixed set of threads
+ * operating off a shared bounded queue. If any thread
+ * terminates due to a failure during execution prior to shutdown,
+ * a new one will take its place if needed to execute subsequent
+ * tasks.
+ *
+ * @param nThreads the number of threads in the pool
+ * @param boundedQueueSize size of the bounded queue
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newFixedThreadPool(int nThreads, int boundedQueueSize)
+ {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(boundedQueueSize));
+ }
+
+ /**
+ * Creates a thread pool that reuses a fixed set of threads
+ * operating off a shared bounded queue, using the provided
+ * ThreadFactory to create new threads when needed.
+ *
+ * @param nThreads the number of threads in the pool
+ * @param threadFactory the factory to use when creating new threads
+ * @param boundedQueueSize size of the bounded queue
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, int boundedQueueSize)
+ {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(boundedQueueSize),
+ threadFactory);
+ }
+}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java 2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java 2008-11-13 18:04:13 UTC (rev 7131)
@@ -229,17 +229,10 @@
{
if (element == null) return; //this element is optional
String asyncPoolSizeStr = getAttributeValue(element, "asyncPoolSize");
- if (existsAttribute(asyncPoolSizeStr))
- {
- try
- {
- config.setListenerAsyncPoolSize(Integer.parseInt(asyncPoolSizeStr));
- }
- catch (NumberFormatException nfe)
- {
- throw new ConfigurationException("Unable to parse the asyncPoolSize attribute of the listeners element. Was [" + asyncPoolSizeStr + "]");
- }
- }
+ if (existsAttribute(asyncPoolSizeStr)) config.setListenerAsyncPoolSize(getInt(asyncPoolSizeStr));
+
+ String asyncQueueSizeStr = getAttributeValue(element, "asyncQueueSize");
+ if (existsAttribute(asyncQueueSizeStr)) config.setListenerAsyncQueueSize(getInt(asyncQueueSizeStr));
}
private void configureInvocationBatching(Element element)
@@ -358,10 +351,15 @@
String replQueueInterval = getAttributeValue(element, "replQueueInterval");
if (existsAttribute(replQueueInterval)) config.setReplQueueInterval(getLong(replQueueInterval));
String replQueueMaxElements = getAttributeValue(element, "replQueueMaxElements");
+
if (existsAttribute(replQueueMaxElements)) config.setReplQueueMaxElements(getInt(replQueueMaxElements));
String serializationExecutorPoolSize = getAttributeValue(element, "serializationExecutorPoolSize");
if (existsAttribute(serializationExecutorPoolSize))
config.setSerializationExecutorPoolSize(getInt(serializationExecutorPoolSize));
+
+ String serializationExecutorQueueSize = getAttributeValue(element, "serializationExecutorQueueSize");
+ if (existsAttribute(serializationExecutorQueueSize))
+ config.setSerializationExecutorQueueSize(getInt(serializationExecutorQueueSize));
}
private void configureLocking(Element element)
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-11-13 18:04:13 UTC (rev 7131)
@@ -21,6 +21,7 @@
*/
package org.jboss.starobrno.marshall;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.commands.VisitableCommand;
@@ -48,7 +49,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -100,14 +100,14 @@
if (replicationProcessor == null)
{
replicationProcessorCount = new AtomicInteger(0);
- replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+ replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
new ThreadFactory()
{
public Thread newThread(Runnable r)
{
return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
}
- }
+ }, c.getSerializationExecutorQueueSize()
);
}
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-11-13 18:04:13 UTC (rev 7131)
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jboss.starobrno.Cache;
import org.jboss.starobrno.CacheException;
@@ -53,7 +54,6 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -160,13 +160,13 @@
// create one if needed
if (config.getListenerAsyncPoolSize() > 0)
{
- asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+ asyncProcessor = BoundedExecutors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
{
public Thread newThread(Runnable r)
{
return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
}
- });
+ }, config.getListenerAsyncQueueSize());
}
else
{
More information about the jbosscache-commits
mailing list