[jbosscache-commits] JBoss Cache SVN: r7131 - in core/branches/flat/src/main/java/org/jboss: starobrno/config/parsing and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Nov 13 13:04:14 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-11-13 13:04:13 -0500 (Thu, 13 Nov 2008)
New Revision: 7131

Added:
   core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java
   core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
   core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
Log:
JBCACHE-1443:  Async marshalling, async cache loading and async notification handling use unbounded queues

Added: core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/BoundedExecutors.java	2008-11-13 18:04:13 UTC (rev 7131)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Similar to JDK {@link java.util.concurrent.Executors} except that the factory methods here allow you to specify the
+ * size of the blocking queue that backs the executor.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @since 3.0
+ */
+public class BoundedExecutors
+{
+   /**
+    * Creates a thread pool that reuses a fixed set of threads
+    * operating off a shared bounded queue. If any thread
+    * terminates due to a failure during execution prior to shutdown,
+    * a new one will take its place if needed to execute subsequent
+    * tasks.
+    *
+    * @param nThreads         the number of threads in the pool
+    * @param boundedQueueSize size of the bounded queue
+    * @return the newly created thread pool
+    */
+   public static ExecutorService newFixedThreadPool(int nThreads, int boundedQueueSize)
+   {
+      return new ThreadPoolExecutor(nThreads, nThreads,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(boundedQueueSize));
+   }
+
+   /**
+    * Creates a thread pool that reuses a fixed set of threads
+    * operating off a shared bounded queue, using the provided
+    * ThreadFactory to create new threads when needed.
+    *
+    * @param nThreads         the number of threads in the pool
+    * @param threadFactory    the factory to use when creating new threads
+    * @param boundedQueueSize size of the bounded queue
+    * @return the newly created thread pool
+    */
+   public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, int boundedQueueSize)
+   {
+      return new ThreadPoolExecutor(nThreads, nThreads,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(boundedQueueSize),
+            threadFactory);
+   }
+}

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java	2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/config/parsing/XmlConfigurationParser.java	2008-11-13 18:04:13 UTC (rev 7131)
@@ -229,17 +229,10 @@
    {
       if (element == null) return; //this element is optional
       String asyncPoolSizeStr = getAttributeValue(element, "asyncPoolSize");
-      if (existsAttribute(asyncPoolSizeStr))
-      {
-         try
-         {
-            config.setListenerAsyncPoolSize(Integer.parseInt(asyncPoolSizeStr));
-         }
-         catch (NumberFormatException nfe)
-         {
-            throw new ConfigurationException("Unable to parse the asyncPoolSize attribute of the listeners element.  Was [" + asyncPoolSizeStr + "]");
-         }
-      }
+      if (existsAttribute(asyncPoolSizeStr)) config.setListenerAsyncPoolSize(getInt(asyncPoolSizeStr));
+
+      String asyncQueueSizeStr = getAttributeValue(element, "asyncQueueSize");
+      if (existsAttribute(asyncQueueSizeStr)) config.setListenerAsyncQueueSize(getInt(asyncQueueSizeStr));
    }
 
    private void configureInvocationBatching(Element element)
@@ -358,10 +351,15 @@
       String replQueueInterval = getAttributeValue(element, "replQueueInterval");
       if (existsAttribute(replQueueInterval)) config.setReplQueueInterval(getLong(replQueueInterval));
       String replQueueMaxElements = getAttributeValue(element, "replQueueMaxElements");
+
       if (existsAttribute(replQueueMaxElements)) config.setReplQueueMaxElements(getInt(replQueueMaxElements));
       String serializationExecutorPoolSize = getAttributeValue(element, "serializationExecutorPoolSize");
       if (existsAttribute(serializationExecutorPoolSize))
          config.setSerializationExecutorPoolSize(getInt(serializationExecutorPoolSize));
+
+      String serializationExecutorQueueSize = getAttributeValue(element, "serializationExecutorQueueSize");
+      if (existsAttribute(serializationExecutorQueueSize))
+         config.setSerializationExecutorQueueSize(getInt(serializationExecutorQueueSize));
    }
 
    private void configureLocking(Element element)

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-11-13 18:04:13 UTC (rev 7131)
@@ -21,6 +21,7 @@
  */
 package org.jboss.starobrno.marshall;
 
+import org.jboss.cache.util.concurrent.BoundedExecutors;
 import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.commands.VisitableCommand;
@@ -48,7 +49,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -100,14 +100,14 @@
          if (replicationProcessor == null)
          {
             replicationProcessorCount = new AtomicInteger(0);
-            replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+            replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
                   new ThreadFactory()
                   {
                      public Thread newThread(Runnable r)
                      {
                         return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
                      }
-                  }
+                  }, c.getSerializationExecutorQueueSize()
             );
          }
       }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-11-13 17:50:09 UTC (rev 7130)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-11-13 18:04:13 UTC (rev 7131)
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.cache.util.concurrent.BoundedExecutors;
 import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jboss.starobrno.Cache;
 import org.jboss.starobrno.CacheException;
@@ -53,7 +54,6 @@
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -160,13 +160,13 @@
          // create one if needed
          if (config.getListenerAsyncPoolSize() > 0)
          {
-            asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+            asyncProcessor = BoundedExecutors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
             {
                public Thread newThread(Runnable r)
                {
                   return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
                }
-            });
+            }, config.getListenerAsyncQueueSize());
          }
          else
          {




More information about the jbosscache-commits mailing list