[jboss-cvs] JBossAS SVN: r99253 - in projects/jboss-threads/trunk/main: src/main/java/org/jboss/threads and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 11 18:14:53 EST 2010


Author: david.lloyd at jboss.com
Date: 2010-01-11 18:14:52 -0500 (Mon, 11 Jan 2010)
New Revision: 99253

Added:
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java
Removed:
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java
Modified:
   projects/jboss-threads/trunk/main/
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
   projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java
Log:
Revisions for management, proper thread pool types


Property changes on: projects/jboss-threads/trunk/main
___________________________________________________________________
Name: svn:ignore
   + target


Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -24,6 +24,7 @@
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -45,15 +46,8 @@
     private JBossExecutors() {}
 
     private static final RuntimePermission MODIFY_THREAD_PERMISSION = new RuntimePermission("modifyThread");
-    private static final RuntimePermission CREATE_PRIVILEGED_THREAD_PERMISSION = new RuntimePermission("createPrivilegedThreads");
     private static final RuntimePermission COPY_CONTEXT_CLASSLOADER_PERMISSION = new RuntimePermission("copyClassLoader");
 
-    private static final AccessControlContext PRIVILEGED_CONTEXT = AccessController.doPrivileged(new PrivilegedAction<AccessControlContext>() {
-        public AccessControlContext run() {
-            return AccessController.getContext();
-        }
-    });
-
     private static final DirectExecutorService DIRECT_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(SimpleDirectExecutor.INSTANCE);
     private static final DirectExecutorService REJECTING_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(RejectingExecutor.INSTANCE);
     private static final DirectExecutorService DISCARDING_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(DiscardingExecutor.INSTANCE);
@@ -174,18 +168,6 @@
     }
 
     /**
-     * Create an executor which executes tasks at the privilege level of this library.
-     * TODO - is this the best approach?
-     *
-     * @param delegate the executor to delegate to at the privileged level
-     * @return the new direct executor
-     */
-    static DirectExecutor highPrivilegeExecutor(final DirectExecutor delegate) {
-        checkAccess(CREATE_PRIVILEGED_THREAD_PERMISSION);
-        return privilegedExecutor(delegate, PRIVILEGED_CONTEXT);
-    }
-
-    /**
      * Create a direct executor which runs tasks with the given context class loader.
      *
      * @param delegate the executor to delegate to
@@ -266,7 +248,7 @@
     }
 
     /**
-     * Create an executor which runs the given initization task before running its given task.
+     * Create an executor which runs the given initialization task before running its given task.
      *
      * @param delegate the delegate direct executor
      * @param initializer the initialization task
@@ -331,9 +313,35 @@
      * @return the executor
      */
     public static Executor threadFactoryExecutor(final ThreadFactory factory) {
-        return new ThreadFactoryExecutor(factory);
+        return new ThreadFactoryExecutor("unnamed", factory, Integer.MAX_VALUE, false);
     }
 
+    /**
+     * Create an executor that executes each task in a new thread.  By default up to the given number of threads may run
+     * concurrently, after which new tasks will be rejected.
+     *
+     * @param factory the thread factory to use
+     * @param maxThreads the maximum number of allowed threads
+     * @return the executor
+     */
+    public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
+        return new ThreadFactoryExecutor("unnamed", factory, maxThreads, false);
+    }
+
+    /**
+     * Create an executor that executes each task in a new thread.  By default up to the given number of threads may run
+     * concurrently, after which the caller will block or new tasks will be rejected, according to the setting of the
+     * {@code blocking} parameter.
+     *
+     * @param factory the thread factory to use
+     * @param maxThreads the maximum number of allowed threads
+     * @param blocking {@code true} if the submitter should block when the maximum number of threads has been reached
+     * @return the executor
+     */
+    public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
+        return new ThreadFactoryExecutor("unnamed", factory, maxThreads, blocking);
+    }
+
     // ==================================================
     // REJECTED EXECUTION HANDLERS
     // ==================================================
@@ -639,6 +647,16 @@
         return new LoggingUncaughtExceptionHandler(log);
     }
 
+    /**
+     * Get an uncaught exception handler which logs to the given logger.
+     *
+     * @param categoryName the name of the logger category to log to
+     * @return the handler
+     */
+    public static Thread.UncaughtExceptionHandler loggingExceptionHandler(final String categoryName) {
+        return new LoggingUncaughtExceptionHandler(Logger.getLogger(categoryName));
+    }
+
     private static final Thread.UncaughtExceptionHandler LOGGING_HANDLER = loggingExceptionHandler(THREAD_ERROR_LOGGER);
 
     /**
@@ -749,6 +767,45 @@
     }
 
     /**
+     * Execute a task uninterruptibly.
+     *
+     * @param executor the executor to delegate to
+     * @param task the task to execute
+     * @throws RejectedExecutionException if the task was rejected by the executor
+     */
+    public static void executeUninterruptibly(Executor executor, Runnable task) throws RejectedExecutionException {
+        boolean intr = Thread.interrupted();
+        try {
+            for (;;) {
+                try {
+                    executor.execute(task);
+                    return;
+                } catch (ExecutionInterruptedException e) {
+                    intr |= Thread.interrupted();
+                }
+            }
+        } finally {
+            if (intr) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Get an executor which executes tasks uninterruptibly in the event of blocking.
+     *
+     * @param delegate the delegate executor
+     * @return the uninterruptible executor
+     */
+    public static Executor uninterruptibleExecutor(final Executor delegate) {
+        if (delegate instanceof UninterruptibleExecutor) {
+            return delegate;
+        } else {
+            return new UninterruptibleExecutor(delegate);
+        }
+    }
+
+    /**
      * Create a builder for a dependent task.
      *
      * @param executor the executor to use

Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -28,7 +28,8 @@
  * A JBoss thread.  Supports logging and extra operations.
  */
 public final class JBossThread extends Thread {
-    private static final Logger log = Logger.getLogger(JBossThread.class);
+    private static final Logger log = Logger.getLogger("org.jboss.threads");
+    private static final Logger ihlog = Logger.getLogger("org.jboss.threads.interrupt-handler");
 
     private volatile InterruptHandler interruptHandler;
     private ThreadNameInfo threadNameInfo;
@@ -97,7 +98,7 @@
      * handler is called from the <em>calling</em> thread, not the thread being interrupted.
      */
     public void interrupt() {
-        log.tracef("Interrupting thread \"%s\"", this);
+        ihlog.tracef("Interrupting thread \"%s\"", this);
         try {
             super.interrupt();
         } finally {
@@ -106,7 +107,7 @@
                 try {
                     interruptHandler.handleInterrupt(this);
                 } catch (Throwable t) {
-                    log.errorf(t, "Interrupt handler %s threw an exception", interruptHandler);
+                    ihlog.errorf(t, "Interrupt handler %s threw an exception", interruptHandler);
                 }
             }
         }

Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,8 +1,8 @@
 /*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
  *
  * This is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as
@@ -28,7 +28,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.lang.reflect.Method;
+import org.jboss.threads.management.ThreadPoolExecutorMBean;
 
 /**
  *
@@ -62,67 +62,26 @@
 
     public void stop() {
         shutdown();
-        try {
-            awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-            // todo log if fails?
-        } catch (InterruptedException e) {
-            // todo log it?
-            Thread.currentThread().interrupt();
-        }
     }
 
-    public void destroy() {
-        // todo is this the right behavior?
-        shutdownNow();
-        try {
-            awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-            // todo log if fails?
-        } catch (InterruptedException e) {
-            // todo log it?
-            Thread.currentThread().interrupt();
-        }
+    public void execute(final Runnable command) {
+        super.execute(command);
     }
 
-    private static final Method GET_ALLOW_CORE_THREAD_TIMEOUT;
-    private static final Method SET_ALLOW_CORE_THREAD_TIMEOUT;
-
-    static {
-        Method method = null;
-        try {
-            method = ThreadPoolExecutor.class.getMethod("allowsCoreThreadTimeOut");
-        } catch (NoSuchMethodException e) {
-        }
-        GET_ALLOW_CORE_THREAD_TIMEOUT = method;
-        try {
-            method = ThreadPoolExecutor.class.getMethod("allowCoreThreadTimeOut", boolean.class);
-        } catch (NoSuchMethodException e) {
-        }
-        SET_ALLOW_CORE_THREAD_TIMEOUT = method;
-    }
-
     public String getName() {
         return name;
     }
 
+    public int getLargestThreadCount() {
+        return super.getLargestPoolSize();
+    }
+
     public boolean isAllowCoreThreadTimeout() {
-        final Method method = GET_ALLOW_CORE_THREAD_TIMEOUT;
-        try {
-            return method != null ? ((Boolean) method.invoke(this)).booleanValue() : false;
-        } catch (Exception e) {
-            return false;
-        }
+        return allowsCoreThreadTimeOut();
     }
 
     public void setAllowCoreThreadTimeout(final boolean allow) {
-        final Method method = SET_ALLOW_CORE_THREAD_TIMEOUT;
-        try {
-            if (method != null) {
-                method.invoke(this, Boolean.valueOf(allow));
-                return;
-            }
-        } catch (Exception e) {
-        }
-        throw new UnsupportedOperationException();
+        setAllowCoreThreadTimeout(allow);
     }
 
     public int getMaxPoolSize() {
@@ -141,7 +100,7 @@
         setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
     }
 
-    public int getCurrentPoolSize() {
+    public int getCurrentThreadCount() {
         return getPoolSize();
     }
 
@@ -157,6 +116,14 @@
         super.setRejectedExecutionHandler(new CountingRejectHandler(handler));
     }
 
+    public boolean isBlocking() {
+        return false;
+    }
+
+    public void setBlocking(final boolean blocking) {
+        throw new UnsupportedOperationException();
+    }
+
     private final class CountingRejectHandler implements RejectedExecutionHandler {
         private final RejectedExecutionHandler delegate;
 
@@ -170,6 +137,9 @@
 
         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
             rejectCount.incrementAndGet();
+            if (isShutdown()) {
+                throw new StoppedExecutorException();
+            }
             delegate.rejectedExecution(r, executor);
         }
     }

Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -22,10 +22,10 @@
 
 package org.jboss.threads;
 
+import java.util.ArrayDeque;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.Condition;
@@ -47,7 +47,7 @@
     // @protectedby lock
     private boolean running;
     // @protectedby lock
-    private RejectionPolicy policy;
+    private boolean blocking;
     // @protectedby lock
     private Executor handoffExecutor;
 
@@ -58,7 +58,7 @@
      * @param parent the parent to delegate tasks to
      */
     public OrderedExecutor(final Executor parent) {
-        this(parent, RejectionPolicy.BLOCK, null);
+        this(parent, new ArrayDeque<Runnable>());
     }
 
     /**
@@ -68,18 +68,28 @@
      * @param queue the queue to use to hold tasks
      */
     public OrderedExecutor(final Executor parent, final Queue<Runnable> queue) {
-        this(parent, queue, RejectionPolicy.BLOCK, null);
+        this(parent, queue, true, null);
     }
 
     /**
-     * Construct a new instance using an unbounded FIFO queue.
+     * Construct a new instance using a bounded FIFO queue of the given size and a blocking reject policy.
      *
+     * @param parent the parent to delegate tasks to
+     * @param queueLength the fixed length of the queue to use to hold tasks
+     */
+    public OrderedExecutor(final Executor parent, final int queueLength) {
+        this(parent, new ArrayQueue<Runnable>(queueLength), true, null);
+    }
+
+    /**
+     * Construct a new instance using a bounded FIFO queue of the given size and a handoff reject policy
+     *
      * @param parent the parent executor
-     * @param policy the task rejection policy
+     * @param queueLength the fixed length of the queue to use to hold tasks
      * @param handoffExecutor the executor to hand tasks to if the queue is full
      */
-    public OrderedExecutor(final Executor parent, final RejectionPolicy policy, final Executor handoffExecutor) {
-        this(parent, new LinkedList<Runnable>(), policy, handoffExecutor);
+    public OrderedExecutor(final Executor parent, final int queueLength, final Executor handoffExecutor) {
+        this(parent, new ArrayQueue<Runnable>(queueLength), false, handoffExecutor);
     }
 
     /**
@@ -87,25 +97,19 @@
      *
      * @param parent the parent executor
      * @param queue the task queue to use
-     * @param policy the task rejection policy
+     * @param blocking {@code true} if rejected tasks should block, {@code false} if rejected tasks should be handed off
      * @param handoffExecutor the executor to hand tasks to if the queue is full
      */
-    public OrderedExecutor(final Executor parent, final Queue<Runnable> queue, final RejectionPolicy policy, final Executor handoffExecutor) {
+    public OrderedExecutor(final Executor parent, final Queue<Runnable> queue, final boolean blocking, final Executor handoffExecutor) {
         if (parent == null) {
             throw new NullPointerException("parent is null");
         }
         if (queue == null) {
             throw new NullPointerException("queue is null");
         }
-        if (policy == null) {
-            throw new NullPointerException("policy is null");
-        }
-        if (policy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
-            throw new NullPointerException("handoffExecutor is null");
-        }
         this.queue = queue;
         this.parent = parent;
-        this.policy = policy;
+        this.blocking = blocking;
         this.handoffExecutor = handoffExecutor;
     }
 
@@ -115,26 +119,21 @@
      * @param command the task to run.
      */
     public void execute(Runnable command) {
-        try {
-            lock.lockInterruptibly();
+        final Executor executor;
+        OUT: for (;;) {
+            lock.lock();
             try {
                 while (! queue.offer(command)) {
-                    switch (policy) {
-                        case ABORT:
-                            throw new RejectedExecutionException();
-                        case BLOCK:
+                    if (blocking) {
+                        try {
                             removeCondition.await();
-                            break;
-                        case DISCARD:
-                            return;
-                        case DISCARD_OLDEST:
-                            if (queue.poll() != null) {
-                                queue.add(command);
-                            }
-                            break;
-                        case HANDOFF:
-                            handoffExecutor.execute(command);
-                            return;
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new ExecutionInterruptedException();
+                        }
+                    } else {
+                        executor = handoffExecutor;
+                        break OUT;
                     }
                 }
                 if (! running) {
@@ -149,12 +148,14 @@
                         }
                     }
                 }
+                return;
             } finally {
                 lock.unlock();
             }
-        } catch (InterruptedException e) {
-            throw new RejectedExecutionException();
         }
+        if (executor != null) {
+            executor.execute(command);
+        }
     }
 
     private class Runner implements Runnable {

Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java (from rev 92149, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,681 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.Queue;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import org.jboss.logging.Logger;
+import org.jboss.threads.management.BoundedQueueThreadPoolExecutorMBean;
+
+/**
+ * An executor which uses a regular queue to hold tasks.  The executor may be tuned at runtime in many ways.
+ */
+public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BoundedQueueThreadPoolExecutorMBean {
+    private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
+
+    private final String name;
+    private final Lock lock = new ReentrantLock();
+    // signal when a task is written to the queue
+    private final Condition enqueueCondition = lock.newCondition();
+    // signal when the queue is read
+    private final Condition removeCondition = lock.newCondition();
+    // signalled when threads terminate
+    private final Condition threadExitCondition = lock.newCondition();
+    private final ThreadFactory threadFactory;
+
+    // all protected by poolLock...
+    private int corePoolSize;
+    private int maxPoolSize;
+    private int largestPoolSize;
+    private int rejectCount;
+    private boolean allowCoreThreadTimeout;
+    private long keepAliveTime;
+    private TimeUnit keepAliveTimeUnit;
+    private boolean blocking;
+    private Executor handoffExecutor;
+
+    private int threadCount;
+    private Set<Thread> workers = new HashSet<Thread>();
+
+    private boolean stop;
+    private boolean interrupt;
+
+    private Queue<Runnable> queue;
+
+    /**
+     * Create a new instance.
+     *
+     * @param name the name of the executor
+     * @param corePoolSize the number of threads to create before enqueueing tasks
+     * @param maxPoolSize the maximum number of threads to create
+     * @param keepAliveTime the amount of time that an idle thread should remain active
+     * @param keepAliveTimeUnit the unit of time for {@code keepAliveTime}
+     * @param queue the queue to use for tasks
+     * @param threadFactory the thread factory to use for new threads
+     * @param blocking {@code true} if the executor should block when the queue is full and no threads are available, {@code false} to use the handoff executor
+     * @param handoffExecutor the executor which is called when blocking is disabled and a task cannot be accepted, or {@code null} to reject the task
+     */
+    public QueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final Queue<Runnable> queue, final ThreadFactory threadFactory, final boolean blocking, final Executor handoffExecutor) {
+        this.name = name;
+        if (threadFactory == null) {
+            throw new NullPointerException("threadFactory is null");
+        }
+        if (queue == null) {
+            throw new NullPointerException("queue is null");
+        }
+        if (keepAliveTimeUnit == null) {
+            throw new NullPointerException("keepAliveTimeUnit is null");
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.threadFactory = threadFactory;
+            // configurable...
+            this.keepAliveTime = keepAliveTime;
+            this.keepAliveTimeUnit = keepAliveTimeUnit;
+            this.corePoolSize = corePoolSize;
+            this.maxPoolSize = maxPoolSize;
+            this.queue = queue;
+            this.blocking = blocking;
+            this.handoffExecutor = handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Create a new instance.
+     *
+     * @param name the name of the executor
+     * @param corePoolSize the number of threads to create before enqueueing tasks
+     * @param maxPoolSize the maximum number of threads to create
+     * @param keepAliveTime the amount of time that an idle thread should remain active
+     * @param keepAliveTimeUnit the unit of time for {@code keepAliveTime}
+     * @param queueLength the fixed queue length to use for tasks
+     * @param threadFactory the thread factory to use for new threads
+     * @param blocking {@code true} if the executor should block when the queue is full and no threads are available, {@code false} to use the handoff executor
+     * @param handoffExecutor the executor which is called when blocking is disabled and a task cannot be accepted, or {@code null} to reject the task
+     */
+    public QueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final int queueLength, final ThreadFactory threadFactory, final boolean blocking, final Executor handoffExecutor) {
+        this.name = name;
+        if (threadFactory == null) {
+            throw new NullPointerException("threadFactory is null");
+        }
+        if (queue == null) {
+            throw new NullPointerException("queue is null");
+        }
+        if (keepAliveTimeUnit == null) {
+            throw new NullPointerException("keepAliveTimeUnit is null");
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.threadFactory = threadFactory;
+            // configurable...
+            this.keepAliveTime = keepAliveTime;
+            this.keepAliveTimeUnit = keepAliveTimeUnit;
+            this.corePoolSize = corePoolSize;
+            this.maxPoolSize = maxPoolSize;
+            queue = new ArrayQueue<Runnable>(queueLength);
+            this.blocking = blocking;
+            this.handoffExecutor = handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Execute a task.
+     *
+     * @param task the task to execute
+     * @throws RejectedExecutionException when a task is rejected by the handoff executor
+     * @throws StoppedExecutorException when the executor is terminating
+     * @throws ExecutionInterruptedException when blocking is enabled and the current thread is interrupted before a task could be accepted
+     */
+    public void execute(final Runnable task) throws RejectedExecutionException {
+        final Executor executor;
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor is stopped");
+                }
+                // Try core thread first, then queue, then extra thread
+                final int count = threadCount;
+                if (count < corePoolSize) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                // next queue...
+                final Queue<Runnable> queue = this.queue;
+                if (queue.offer(task)) {
+                    enqueueCondition.signal();
+                    return;
+                }
+                // extra threads?
+                if (count < maxPoolSize) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                if (blocking) {
+                    try {
+                        removeCondition.await();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new ExecutionInterruptedException("Thread interrupted");
+                    }
+                } else {
+                    // delegate the task outside of the lock.
+                    rejectCount++;
+                    executor = handoffExecutor;
+                    break;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+        if (executor != null) {
+            executor.execute(task);
+        }
+        return;
+    }
+
+    /** {@inheritDoc} */
+    public void shutdown() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            if (! stop) {
+                stop = true;
+                // wake up the whole town
+                removeCondition.signalAll();
+                enqueueCondition.signalAll();
+                for (Thread worker : workers) {
+                    worker.interrupt();
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public List<Runnable> shutdownNow() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            stop = true;
+            removeCondition.signalAll();
+            enqueueCondition.signalAll();
+            for (Thread worker : workers) {
+                worker.interrupt();
+            }
+            final Queue<Runnable> queue = this.queue;
+            final ArrayList<Runnable> list = new ArrayList<Runnable>(queue);
+            queue.clear();
+            return list;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public boolean isShutdown() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return stop;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public boolean isTerminated() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return stop && threadCount == 0;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
+        final Lock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            if (workers.contains(Thread.currentThread())) {
+                throw new IllegalStateException("Cannot await termination of a thread pool from one of its threads");
+            }
+            final long start = System.currentTimeMillis();
+            long elapsed = 0L;
+            while (! stop && threadCount > 0) {
+                final long remaining = timeout - elapsed;
+                if (remaining <= 0) {
+                    return false;
+                }
+                threadExitCondition.await(remaining, unit);
+                elapsed = unit.convert(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
+            }
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public boolean isAllowCoreThreadTimeout() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return allowCoreThreadTimeout;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.allowCoreThreadTimeout = allowCoreThreadTimeout;
+            if (allowCoreThreadTimeout) {
+                // wake everyone up so core threads can time out
+                enqueueCondition.signalAll();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public int getCorePoolSize() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return corePoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public void setCorePoolSize(final int corePoolSize) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            final int oldLimit = this.corePoolSize;
+            if (maxPoolSize < corePoolSize) {
+                // don't let the max thread limit be less than the core thread limit.
+                // the called method will signal as needed
+                setMaxPoolSize(corePoolSize);
+            } else if (oldLimit < corePoolSize) {
+                // we're growing the number of core threads
+                // therefore signal anyone waiting to add tasks; there might be more threads to add
+                removeCondition.signalAll();
+            } else if (oldLimit > corePoolSize) {
+                // we're shrinking the number of core threads
+                // therefore signal anyone waiting to remove tasks so the pool can shrink properly
+                enqueueCondition.signalAll();
+            } else {
+                // we aren't changing anything...
+                return;
+            }
+            this.corePoolSize = corePoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public int getMaxPoolSize() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return maxPoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public void setMaxPoolSize(final int maxPoolSize) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            final int oldLimit = this.maxPoolSize;
+            if (maxPoolSize < corePoolSize) {
+                // don't let the max thread limit be less than the core thread limit.
+                // the called method will signal as needed
+                setCorePoolSize(maxPoolSize);
+            } else if (oldLimit < maxPoolSize) {
+                // we're growing the number of extra threads
+                // therefore signal anyone waiting to add tasks; there might be more threads to add
+                removeCondition.signalAll();
+            } else if (oldLimit > maxPoolSize) {
+                // we're shrinking the number of extra threads
+                // therefore signal anyone waiting to remove tasks so the pool can shrink properly
+                enqueueCondition.signalAll();
+            } else {
+                // we aren't changing anything...
+                return;
+            }
+            this.maxPoolSize = maxPoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public long getKeepAliveTime() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return keepAliveTime;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Set the keep-alive time to the given amount of time.
+     *
+     * @param keepAliveTime the amount of time
+     * @param keepAliveTimeUnit the unit of time
+     */
+    public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveTimeUnit) {
+        if (keepAliveTimeUnit == null) {
+            throw new NullPointerException("keepAliveTimeUnit is null");
+        }
+        if (keepAliveTime < 0L) {
+            throw new IllegalArgumentException("keepAliveTime is less than zero");
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.keepAliveTime = keepAliveTimeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public void setKeepAliveTime(final long milliseconds) {
+        setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Determine whether this thread pool executor is set to block when a task cannot be accepted immediately.
+     *
+     * @return {@code true} if blocking is enabled, {@code false} if the handoff executor is used
+     */
+    public boolean isBlocking() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return blocking;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Set whether this thread pool executor should be set to block when a task cannot be accepted immediately.
+     *
+     * @param blocking {@code true} if blocking is enabled, {@code false} if the handoff executor is used
+     */
+    public void setBlocking(boolean blocking) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.blocking = blocking;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Get the handoff executor which is called when a task cannot be accepted immediately.
+     *
+     * @return the handoff executor
+     */
+    public Executor getHandoffExecutor() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Set the handoff executor which is called when a task cannot be accepted immediately.
+     *
+     * @param handoffExecutor the handoff executor
+     */
+    public void setHandoffExecutor(final Executor handoffExecutor) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.handoffExecutor = handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // call with lock held!
+    private void startNewThread(final Runnable task) {
+        final Thread thread = threadFactory.newThread(new Worker(task));
+        workers.add(thread);
+        final int size = workers.size();
+        if (size > largestPoolSize) {
+            largestPoolSize = size;
+        }
+        thread.start();
+    }
+
+    // call with lock held!
+    private Runnable pollTask() {
+        final Runnable task = queue.poll();
+        if (task != null) {
+            removeCondition.signal();
+            return task;
+        } else {
+            if (-- threadCount == 0) {
+                threadExitCondition.signalAll();
+            }
+            return null;
+        }
+    }
+
+    // call with lock held!
+    private Runnable takeTask() {
+        final Condition removeCondition = this.removeCondition;
+        Runnable task = queue.poll();
+        if (task != null) {
+            removeCondition.signal();
+            return task;
+        } else {
+            final Condition enqueueCondition = this.enqueueCondition;
+            final long start = System.currentTimeMillis();
+            boolean intr = Thread.interrupted();
+            try {
+                long elapsed = 0L;
+                for (;;) {
+                    // these parameters may change on each iteration
+                    final int threadCount = this.threadCount;
+                    final int coreThreadLimit = corePoolSize;
+                    final boolean allowCoreThreadTimeout = this.allowCoreThreadTimeout;
+                    if (stop || threadCount > maxPoolSize) {
+                        // too many threads.  Handle a task if there is one, otherwise exit
+                        return pollTask();
+                    } else if (!allowCoreThreadTimeout && threadCount < coreThreadLimit) {
+                        // ignore timeout until we are not a core thread or until core threads are allowed to time out
+                        try {
+                            enqueueCondition.await();
+                        } catch (InterruptedException e) {
+                            intr = true;
+                        }
+                    } else {
+                        final TimeUnit timeUnit = keepAliveTimeUnit;
+                        final long time = keepAliveTime;
+                        final long remaining = time - timeUnit.convert(elapsed, TimeUnit.MILLISECONDS);
+                        if (remaining <= 0L && (allowCoreThreadTimeout || threadCount > coreThreadLimit)) {
+                            // the timeout has expired
+                            return pollTask();
+                        }
+                        try {
+                            enqueueCondition.await(remaining, timeUnit);
+                        } catch (InterruptedException e) {
+                            intr = true;
+                        }
+                    }
+                    task = queue.poll();
+                    if (task != null) {
+                        removeCondition.signal();
+                        return task;
+                    }
+                    elapsed = System.currentTimeMillis() - start;
+                }
+            } finally {
+                if (intr) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    public String getName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    public int getCurrentThreadCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return workers.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public int getLargestThreadCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return largestPoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    public int getRejectedCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return rejectCount;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void runTask(Runnable task) {
+        if (task != null) try {
+            task.run();
+        } catch (Throwable t) {
+            log.errorf(t, "Task execution failed for task %s", task);
+        }
+    }
+
+    private class Worker implements Runnable {
+
+        private volatile Runnable first;
+
+        public Worker(final Runnable command) {
+            first = command;
+        }
+
+        public void run() {
+            final Lock lock = QueueExecutor.this.lock;
+            try {
+                Runnable task = first;
+                // Release reference to task
+                first = null;
+                runTask(task);
+                for (;;) {
+                    // don't hang on to task while we possibly block waiting for the next one
+                    task = null;
+                    lock.lock();
+                    try {
+                        if (stop) {
+                            // drain queue
+                            if ((task = pollTask()) == null) {
+                                return;
+                            }
+                            Thread.currentThread().interrupt();
+                        } else {
+                            // get next task
+                            if ((task = takeTask()) == null) {
+                                return;
+                            }
+                        }
+                    } finally {
+                        lock.unlock();
+                    }
+                    runTask(task);
+                    Thread.interrupted();
+                }
+            } finally {
+                lock.lock();
+                try {
+                    workers.remove(Thread.currentThread());
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+}

Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java (from rev 91991, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,516 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.Set;
+import java.util.HashSet;
+import org.jboss.logging.Logger;
+import org.jboss.threads.management.BoundedThreadPoolExecutorMBean;
+
+/**
+ * A queueless thread pool.  If one or more threads are waiting for work when a task is submitted, it will be used.
+ * Otherwise, if fewer than the maximum threads are started, a new thread is created.
+ */
+public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BoundedThreadPoolExecutorMBean  {
+    private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
+
+    private final String name;
+    private final ThreadFactory threadFactory;
+    private final DirectExecutor taskExecutor;
+
+    private final Lock lock = new ReentrantLock(false);
+    private final Condition runnableDequeued = lock.newCondition();
+    private final Condition nextReady = lock.newCondition();
+    private final Condition workerDequeued = lock.newCondition();
+    private final Condition taskEnqueued = lock.newCondition();
+    private final Condition threadDeath = lock.newCondition();
+
+    /**
+     * Protected by {@link #lock}
+     */
+    private final Set<Thread> runningThreads = new HashSet<Thread>(256);
+
+    /**
+     * Protected by {@link #lock}, signal {@link #runnableDequeued} on clear
+     */
+    private Runnable workRunnable;
+
+    /**
+     * Protected by {@link #lock}, signal {@link #workerDequeued} on clear
+     */
+    private Worker waitingWorker;
+
+    /**
+     * Configuration value.
+     * Protected by {@link #lock}
+     */
+    private long idleTimeout;
+
+    /**
+     * Configuration value.
+     * Protected by {@link #lock}
+     */
+    private int maxThreads;
+
+    /**
+     * Specify whether this executor blocks when no threads are available.
+     * Protected by {@link #lock}
+     */
+    private boolean blocking;
+
+    private Executor handoffExecutor;
+
+    private boolean stop;
+
+    //-- statistics --
+    private int largestPoolSize;
+    private int rejectedCount;
+
+    public QueuelessExecutor(final String name, final ThreadFactory threadFactory, final DirectExecutor taskExecutor, final Executor handoffExecutor, final long idleTimeout) {
+        this.name = name;
+        this.threadFactory = threadFactory;
+        this.taskExecutor = taskExecutor;
+        this.handoffExecutor = handoffExecutor;
+        this.idleTimeout = idleTimeout;
+    }
+
+    public long getIdleTimeout() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return idleTimeout;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setIdleTimeout(final long idleTimeout) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.idleTimeout = idleTimeout;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getMaxThreads() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return maxThreads;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setMaxThreads(final int maxThreads) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.maxThreads = maxThreads;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRunningThreads() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return runningThreads.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getCorePoolSize() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return maxThreads;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setCorePoolSize(final int newSize) {
+        if (newSize < 1) {
+            throw new IllegalArgumentException("Pool size must be at least 1");
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            maxThreads = newSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public long getKeepAliveTime() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return idleTimeout;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setKeepAliveTime(final long milliseconds) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            idleTimeout = milliseconds;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getCurrentThreadCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return runningThreads.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getLargestThreadCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return largestPoolSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRejectedCount() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return rejectedCount;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isBlocking() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return blocking;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setBlocking(final boolean blocking) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.blocking = blocking;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Executor getHandoffExecutor() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void setHandoffExecutor(final Executor handoffExecutor) {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            this.handoffExecutor = handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void shutdown() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            if (! stop) {
+                for (Thread runningThread : runningThreads) {
+                    runningThread.interrupt();
+                }
+            }
+            stop = true;
+            // wake up all waiters
+            runnableDequeued.signalAll();
+            nextReady.signalAll();
+            workerDequeued.signalAll();
+            taskEnqueued.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            if (! stop) {
+                throw new IllegalStateException("Not shut down");
+            }
+            final Date deadline = new Date(clipHigh(unit.toMillis(timeout) + System.currentTimeMillis()));
+            final Condition threadDeath = this.threadDeath;
+            while (! runningThreads.isEmpty() && threadDeath.awaitUntil(deadline));
+            return runningThreads.isEmpty();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public List<Runnable> shutdownNow() {
+        shutdown();
+        // tasks are never queued
+        return Collections.emptyList();
+    }
+
+    public boolean isShutdown() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return stop;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isTerminated() {
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            return stop && runningThreads.isEmpty();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void execute(final Runnable command) {
+        final Executor executor;
+        final Set<Thread> runningThreads = this.runningThreads;
+        final Condition runnableDequeued = this.runnableDequeued;
+        final Lock lock = this.lock;
+        Runnable workRunnable;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor has been shut down");
+                }
+                final Worker waitingWorker;
+                if ((waitingWorker = this.waitingWorker) != null) {
+                    // a worker thread was waiting for a task; give it the task and wake it up
+                    waitingWorker.setRunnable(command);
+                    taskEnqueued.signal();
+                    this.waitingWorker = null;
+                    return;
+                }
+                // no worker thread was waiting
+                final int currentSize = runningThreads.size();
+                if (currentSize < maxThreads) {
+                    // if we haven't reached the thread limit yet, start up another thread
+                    final Thread thread = threadFactory.newThread(new Worker(command));
+                    if (thread == null) {
+                        throw new ThreadCreationException();
+                    }
+                    if (! runningThreads.add(thread)) {
+                        throw new ThreadCreationException("Unable to add new thread to the running set");
+                    }
+                    if (currentSize >= largestPoolSize) {
+                        largestPoolSize = currentSize + 1;
+                    }
+                    thread.start();
+                    return;
+                }
+                if (! blocking) {
+                    // not blocking, not accepted
+                    executor = handoffExecutor;
+                    rejectedCount++;
+                    // fall out to execute outside of lock
+                    break;
+                }
+                workRunnable = this.workRunnable;
+                if (workRunnable != null) {
+                    // someone else is waiting for a worker, so we wait for them
+                    try {
+                        nextReady.await();
+                        continue;
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new ExecutionInterruptedException();
+                    }
+                }
+                this.workRunnable = command;
+                try {
+                    runnableDequeued.await();
+                    if (this.workRunnable == null) {
+                        // task was accepted
+                        nextReady.signal();
+                        return;
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new ExecutionInterruptedException();
+                } finally {
+                    this.workRunnable = null;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+        if (executor != null) {
+            executor.execute(command);
+        }
+    }
+
+    private static long clipHigh(long value) {
+        return value < 0 ? Long.MAX_VALUE : value;
+    }
+
+    private final class Worker implements Runnable {
+
+        private Runnable runnable;
+
+        private Worker(final Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        private void setRunnable(final Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        private boolean awaitTimed(Condition condition, long idleSince) {
+            final long end = clipHigh(System.currentTimeMillis() + idleTimeout);
+            long remaining = end - idleSince;
+            if (remaining < 0L) {
+                return false;
+            }
+            if (stop) return false;
+            try {
+                condition.await(remaining, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+            }
+            return ! stop;
+        }
+
+        public void run() {
+            final Lock lock = QueuelessExecutor.this.lock;
+            final Condition workerDequeued = QueuelessExecutor.this.workerDequeued;
+            final Condition runnableDequeued = QueuelessExecutor.this.runnableDequeued;
+            final Condition taskEnqueued = QueuelessExecutor.this.taskEnqueued;
+            final Set<Thread> runningThreads = QueuelessExecutor.this.runningThreads;
+            final DirectExecutor taskExecutor = QueuelessExecutor.this.taskExecutor;
+            final Thread thread = Thread.currentThread();
+            long idleSince = Long.MAX_VALUE;
+            Runnable runnable = this.runnable;
+            this.runnable = null;
+            try {
+                MAIN: for (;;) {
+
+                    // Run task
+                    try {
+                        taskExecutor.execute(runnable);
+                    } catch (Throwable t) {
+                        log.errorf(t, "Task execution failed for task %s", runnable);
+                    }
+                    idleSince = System.currentTimeMillis();
+                    // Get next task
+                    lock.lock();
+                    try {
+                        if (stop || runningThreads.size() > maxThreads) {
+                            if (runningThreads.remove(thread) && runningThreads.isEmpty()) {
+                                threadDeath.signalAll();
+                            }
+                            return;
+                        }
+                        if ((runnable = workRunnable) != null) {
+                            // there's a task, take it and continue on to the top of the loop
+                            workRunnable = null;
+                            runnableDequeued.signal();
+                        } else {
+                            // no executors are waiting, so we wait instead for an executor
+                            while (waitingWorker != null) {
+                                // wait... to wait
+                                if (! awaitTimed(workerDequeued, idleSince)) return;
+                                if ((runnable = workRunnable) != null) {
+                                    // sniped an easy one by luck!
+                                    continue MAIN;
+                                }
+                            }
+                            waitingWorker = this;
+                            try {
+                                do {
+                                    // wait for a job
+                                    if (! awaitTimed(taskEnqueued, idleSince)) return;
+                                } while ((runnable = this.runnable) == null);
+                            } finally {
+                                waitingWorker = null;
+                                workerDequeued.signal();
+                            }
+                        }
+                    } finally {
+                        lock.unlock();
+                    }
+                }
+            } finally {
+                lock.lock();
+                try {
+                    if (stop && runningThreads.remove(thread) && runningThreads.isEmpty()) {
+                        threadDeath.signalAll();
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+}

Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,51 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-/**
- * Specify the way a task rejection should be handled.
- */
-public enum RejectionPolicy {
-
-    /**
-     * Abort execution with an exception.
-     */
-    ABORT,
-    /**
-     * Wait until there is room in the queue, or the calling thread is interrupted.
-     */
-    BLOCK,
-    /**
-     * Discard the excess task silently.
-     */
-    DISCARD,
-    /**
-     * Discard the oldest task in the queue silently, and enqueue the new task.  If the queue has no capacity
-     * then the new task will be discarded.
-     */
-    DISCARD_OLDEST,
-    /**
-     * Hand off the task to another executor.
-     */
-    HANDOFF,
-}

Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,595 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.ExecutorService;
-import java.util.Queue;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-
-/**
- * An executor which uses a regular queue to hold tasks.  The executor may be tuned at runtime in many ways.
- */
-public final class SimpleQueueExecutor extends AbstractExecutorService implements ExecutorService, ThreadPoolExecutorMBean {
-    private final String name;
-    private final Lock lock = new ReentrantLock();
-    // signal when a task is written to the queue
-    private final Condition enqueueCondition = lock.newCondition();
-    // signal when the queue is read
-    private final Condition removeCondition = lock.newCondition();
-    // signalled when threads terminate
-    private final Condition threadExitCondition = lock.newCondition();
-    private final ThreadFactory threadFactory;
-
-    // all protected by poolLock...
-    private int corePoolSize;
-    private int maxPoolSize;
-    private int largestPoolSize;
-    private int rejectCount;
-    private boolean allowCoreThreadTimeout;
-    private long keepAliveTime;
-    private TimeUnit keepAliveTimeUnit;
-    private RejectionPolicy rejectionPolicy;
-    private Executor handoffExecutor;
-
-    private int threadCount;
-    private Set<Thread> workers = new HashSet<Thread>();
-
-    private boolean stop;
-    private boolean interrupt;
-
-    private Queue<Runnable> queue;
-
-    public SimpleQueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final Queue<Runnable> queue, final ThreadFactory threadFactory, final RejectionPolicy rejectionPolicy, final Executor handoffExecutor) {
-        this.name = name;
-        if (threadFactory == null) {
-            throw new NullPointerException("threadFactory is null");
-        }
-        if (queue == null) {
-            throw new NullPointerException("queue is null");
-        }
-        if (keepAliveTimeUnit == null) {
-            throw new NullPointerException("keepAliveTimeUnit is null");
-        }
-        if (rejectionPolicy == null) {
-            throw new NullPointerException("rejectionPolicy is null");
-        }
-        if (rejectionPolicy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
-            throw new NullPointerException("handoffExecutor is null");
-        }
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            this.threadFactory = threadFactory;
-            // configurable...
-            this.keepAliveTime = keepAliveTime;
-            this.keepAliveTimeUnit = keepAliveTimeUnit;
-            this.corePoolSize = corePoolSize;
-            this.maxPoolSize = maxPoolSize;
-            this.queue = queue;
-            this.rejectionPolicy = rejectionPolicy;
-            this.handoffExecutor = handoffExecutor;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void execute(final Runnable task) throws RejectedExecutionException {
-        final Lock lock = this.lock;
-        try {
-            lock.lockInterruptibly();
-            try {
-                for (;;) {
-                    if (stop) {
-                        throw new RejectedExecutionException("Executor is stopped");
-                    }
-                    // Try core thread first, then queue, then extra thread
-                    final int count = threadCount;
-                    if (count < corePoolSize) {
-                        startNewThread(task);
-                        threadCount = count + 1;
-                        return;
-                    }
-                    // next queue...
-                    final Queue<Runnable> queue = this.queue;
-                    if (queue.offer(task)) {
-                        enqueueCondition.signal();
-                        return;
-                    }
-                    // extra threads?
-                    if (count < maxPoolSize) {
-                        startNewThread(task);
-                        threadCount = count + 1;
-                        return;
-                    }
-                    rejectCount++;
-                    // how to reject the task...
-                    switch (rejectionPolicy) {
-                        case ABORT:
-                            throw new RejectedExecutionException("Executor is busy");
-                        case BLOCK:
-                            removeCondition.await();
-                            break;
-                        case DISCARD:
-                            return;
-                        case DISCARD_OLDEST:
-                            if (queue.poll() != null) {
-                                queue.add(task);
-                                enqueueCondition.signal();
-                            }
-                            return;
-                        case HANDOFF:
-                            handoffExecutor.execute(task);
-                            return;
-                    }
-                }
-            } finally {
-                lock.unlock();
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ExecutionInterruptedException("Thread interrupted");
-        }
-    }
-
-    public void shutdown() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            if (! stop) {
-                stop = true;
-                // wake up the whole town
-                removeCondition.signalAll();
-                enqueueCondition.signalAll();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public List<Runnable> shutdownNow() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            stop = true;
-            interrupt = true;
-            removeCondition.signalAll();
-            enqueueCondition.signalAll();
-            for (Thread worker : workers) {
-                worker.interrupt();
-            }
-            final Queue<Runnable> queue = this.queue;
-            final ArrayList<Runnable> list = new ArrayList<Runnable>(queue);
-            queue.clear();
-            return list;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isShutdown() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return stop;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isTerminated() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return stop && threadCount == 0;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
-        final Lock lock = this.lock;
-        lock.lockInterruptibly();
-        try {
-            if (workers.contains(Thread.currentThread())) {
-                throw new IllegalStateException("Cannot await termination of a thread pool from one of its threads");
-            }
-            final long start = System.currentTimeMillis();
-            long elapsed = 0L;
-            while (! stop && threadCount > 0) {
-                final long remaining = timeout - elapsed;
-                if (remaining <= 0) {
-                    return false;
-                }
-                threadExitCondition.await(remaining, unit);
-                elapsed = unit.convert(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
-            }
-            return true;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isAllowCoreThreadTimeout() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return allowCoreThreadTimeout;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            this.allowCoreThreadTimeout = allowCoreThreadTimeout;
-            if (allowCoreThreadTimeout) {
-                // wake everyone up so core threads can time out
-                enqueueCondition.signalAll();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getCorePoolSize() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return corePoolSize;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setCorePoolSize(final int corePoolSize) {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            final int oldLimit = this.corePoolSize;
-            if (maxPoolSize < corePoolSize) {
-                // don't let the max thread limit be less than the core thread limit.
-                // the called method will signal as needed
-                setMaxPoolSize(corePoolSize);
-            } else if (oldLimit < corePoolSize) {
-                // we're growing the number of core threads
-                // therefore signal anyone waiting to add tasks; there might be more threads to add
-                removeCondition.signalAll();
-            } else if (oldLimit > corePoolSize) {
-                // we're shrinking the number of core threads
-                // therefore signal anyone waiting to remove tasks so the pool can shrink properly
-                enqueueCondition.signalAll();
-            } else {
-                // we aren't changing anything...
-                return;
-            }
-            this.corePoolSize = corePoolSize;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getMaxPoolSize() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return maxPoolSize;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setMaxPoolSize(final int maxPoolSize) {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            final int oldLimit = this.maxPoolSize;
-            if (maxPoolSize < corePoolSize) {
-                // don't let the max thread limit be less than the core thread limit.
-                // the called method will signal as needed
-                setCorePoolSize(maxPoolSize);
-            } else if (oldLimit < maxPoolSize) {
-                // we're growing the number of extra threads
-                // therefore signal anyone waiting to add tasks; there might be more threads to add
-                removeCondition.signalAll();
-            } else if (oldLimit > maxPoolSize) {
-                // we're shrinking the number of extra threads
-                // therefore signal anyone waiting to remove tasks so the pool can shrink properly
-                enqueueCondition.signalAll();
-            } else {
-                // we aren't changing anything...
-                return;
-            }
-            this.maxPoolSize = maxPoolSize;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public long getKeepAliveTime() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return keepAliveTime;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveTimeUnit) {
-        if (keepAliveTimeUnit == null) {
-            throw new NullPointerException("keepAliveTimeUnit is null");
-        }
-        if (keepAliveTime < 0L) {
-            throw new IllegalArgumentException("keepAliveTime is less than zero");
-        }
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            this.keepAliveTime = keepAliveTimeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setKeepAliveTime(final long milliseconds) {
-        setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
-    }
-
-    public RejectionPolicy getRejectionPolicy() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return rejectionPolicy;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void setRejectionPolicy(final RejectionPolicy newPolicy, final Executor handoffExecutor) {
-        if (newPolicy == null) {
-            throw new NullPointerException("rejectionPolicy is null");
-        }
-        if (newPolicy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
-            throw new NullPointerException("handoffExecutor is null");
-        }
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            if (rejectionPolicy == RejectionPolicy.BLOCK && newPolicy != RejectionPolicy.BLOCK) {
-                // there could be blocking .execute() calls out there; give them a nudge
-                removeCondition.signalAll();
-            }
-            rejectionPolicy = newPolicy;
-            this.handoffExecutor = handoffExecutor;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // container lifecycle methods
-    public void stop() {
-        shutdown();
-        try {
-            awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-            // todo log if fails?
-        } catch (InterruptedException e) {
-            // todo log it?
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    public void destroy() {
-        // todo is this the right behavior?
-        shutdownNow();
-        try {
-            awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-            // todo log if fails?
-        } catch (InterruptedException e) {
-            // todo log it?
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    // call with lock held!
-    private void startNewThread(final Runnable task) {
-        final Thread thread = threadFactory.newThread(new Worker(task));
-        workers.add(thread);
-        final int size = workers.size();
-        if (size > largestPoolSize) {
-            largestPoolSize = size;
-        }
-        thread.start();
-    }
-
-    // call with lock held!
-    private Runnable pollTask() {
-        final Runnable task = queue.poll();
-        if (task != null) {
-            removeCondition.signal();
-            return task;
-        } else {
-            if (-- threadCount == 0) {
-                threadExitCondition.signalAll();
-            }
-            return null;
-        }
-    }
-
-    // call with lock held!
-    private Runnable takeTask() {
-        final Condition removeCondition = this.removeCondition;
-        Runnable task = queue.poll();
-        if (task != null) {
-            removeCondition.signal();
-            return task;
-        } else {
-            final Condition enqueueCondition = this.enqueueCondition;
-            final long start = System.currentTimeMillis();
-            boolean intr = Thread.interrupted();
-            try {
-                long elapsed = 0L;
-                for (;;) {
-                    // these parameters may change on each iteration
-                    final int threadCount = this.threadCount;
-                    final int coreThreadLimit = corePoolSize;
-                    final boolean allowCoreThreadTimeout = this.allowCoreThreadTimeout;
-                    if (stop || threadCount > maxPoolSize) {
-                        // too many threads.  Handle a task if there is one, otherwise exit
-                        return pollTask();
-                    } else if (!allowCoreThreadTimeout && threadCount < coreThreadLimit) {
-                        // ignore timeout until we are not a core thread or until core threads are allowed to time out
-                        try {
-                            enqueueCondition.await();
-                        } catch (InterruptedException e) {
-                            intr = true;
-                        }
-                    } else {
-                        final TimeUnit timeUnit = keepAliveTimeUnit;
-                        final long time = keepAliveTime;
-                        final long remaining = time - timeUnit.convert(elapsed, TimeUnit.MILLISECONDS);
-                        if (remaining <= 0L && (allowCoreThreadTimeout || threadCount > coreThreadLimit)) {
-                            // the timeout has expired
-                            return pollTask();
-                        }
-                        try {
-                            enqueueCondition.await(remaining, timeUnit);
-                        } catch (InterruptedException e) {
-                            intr = true;
-                        }
-                    }
-                    task = queue.poll();
-                    if (task != null) {
-                        removeCondition.signal();
-                        return task;
-                    }
-                    elapsed = System.currentTimeMillis() - start;
-                }
-            } finally {
-                if (intr) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getCurrentPoolSize() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return workers.size();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getLargestPoolSize() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return largestPoolSize;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getRejectedCount() {
-        final Lock lock = this.lock;
-        lock.lock();
-        try {
-            return rejectCount;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private class Worker implements Runnable {
-
-        private Runnable first;
-
-        public Worker(final Runnable command) {
-            first = command;
-        }
-
-        public void run() {
-            final Lock lock = SimpleQueueExecutor.this.lock;
-            Runnable task = first;
-            // Release reference to task
-            first = null;
-            lock.lock();
-            try {
-                for (;;) {
-                    if (task != null) {
-                        try {
-                            if (interrupt) {
-                                Thread.currentThread().interrupt();
-                            }
-                            lock.unlock();
-                            try {
-                                task.run();
-                            } finally {
-                                // this is OK because it's in the finally block after lock.unlock()
-                                //noinspection LockAcquiredButNotSafelyReleased
-                                lock.lock();
-                            }
-                        } catch (Throwable t) {
-                            // todo - log the exception perhaps
-                        }
-                        // don't hang on to task while we possibly block down below
-                        task = null;
-                    }
-                    if (stop) {
-                        // drain queue
-                        if ((task = pollTask()) == null) {
-                            return;
-                        }
-                    } else {
-                        // get next task
-                        if ((task = takeTask()) == null) {
-                            return;
-                        }
-                    }
-                }
-            } finally {
-                workers.remove(Thread.currentThread());
-                lock.unlock();
-            }
-        }
-    }
-}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when a task is submitted to an executor which is in the process of, or has completed shutting down.
+ */
+public final class StoppedExecutorException extends RejectedExecutionException {
+
+    private static final long serialVersionUID = 4815103522815471074L;
+
+    /**
+     * Constructs a {@code StoppedExecutorException} with no detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    public StoppedExecutorException() {
+    }
+
+    /**
+     * Constructs a {@code StoppedExecutorException} with the specified detail message. The cause is not initialized, and
+     * may subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     *
+     * @param msg the detail message
+     */
+    public StoppedExecutorException(final String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructs a {@code StoppedExecutorException} with the specified cause. The detail message is set to:
+     * <pre>(cause == null ? null : cause.toString())</pre>
+     * (which typically contains the class and detail message of {@code cause}).
+     *
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public StoppedExecutorException(final Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a {@code StoppedExecutorException} with the specified detail message and cause.
+     *
+     * @param msg the detail message
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public StoppedExecutorException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when a thread factory refuses to create a thread for a thread pool.
+ */
+public final class ThreadCreationException extends RejectedExecutionException {
+
+    private static final long serialVersionUID = 5666802385744283783L;
+
+    /**
+     * Constructs a {@code ThreadCreationException} with no detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    public ThreadCreationException() {
+    }
+
+    /**
+     * Constructs a {@code ThreadCreationException} with the specified detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     *
+     * @param msg the detail message
+     */
+    public ThreadCreationException(final String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructs a {@code ThreadCreationException} with the specified cause. The detail message is set to:
+     * <pre>(cause == null ? null : cause.toString())</pre>
+     * (which typically contains the class and detail message of {@code cause}).
+     *
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public ThreadCreationException(final Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a {@code ThreadCreationException} with the specified detail message and cause.
+     *
+     * @param msg the detail message
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public ThreadCreationException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+}

Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -23,25 +23,134 @@
 package org.jboss.threads;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.threads.management.ThreadExecutorMBean;
 
-class ThreadFactoryExecutor implements Executor {
+class ThreadFactoryExecutor implements Executor, ThreadExecutorMBean {
 
     private final ThreadFactory factory;
+    private final Semaphore limitSemaphore;
 
-    ThreadFactoryExecutor(final ThreadFactory factory) {
+    private final String name;
+    private final Object lock = new Object();
+    private int maxThreads;
+    private int largestThreadCount;
+    private int currentThreadCount;
+    private final AtomicInteger rejected = new AtomicInteger();
+    private volatile boolean blocking;
+
+    ThreadFactoryExecutor(final String name, final ThreadFactory factory, int maxThreads, boolean blocking) {
+        this.name = name;
         this.factory = factory;
+        this.maxThreads = maxThreads;
+        this.blocking = blocking;
+        limitSemaphore = new Semaphore(maxThreads);
     }
 
+    public int getMaxThreads() {
+        synchronized (lock) {
+            return maxThreads;
+        }
+    }
+
+    public void setMaxThreads(final int maxThreads) {
+        if (maxThreads < 0) {
+            throw new IllegalArgumentException("Max threads must not be negative");
+        }
+        synchronized (lock) {
+            final int old = this.maxThreads;
+            final int diff = old - maxThreads;
+            if (diff < 0) {
+                limitSemaphore.release(-diff);
+            } else if (diff > 0) {
+                if (! limitSemaphore.tryAcquire(diff)) {
+                    throw new IllegalArgumentException("Cannot reduce maximum threads below current number of running threads");
+                }
+            }
+            this.maxThreads = maxThreads;
+        }
+    }
+
     public void execute(final Runnable command) {
-        final Thread thread = factory.newThread(command);
-        if (thread == null) {
-            throw new RejectedExecutionException("No threads can be created");
+        try {
+            final Semaphore semaphore = limitSemaphore;
+            if (blocking) {
+                try {
+                    semaphore.acquire();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new ExecutionInterruptedException();
+                }
+            } else {
+                if (! semaphore.tryAcquire()) {
+                    throw new RejectedExecutionException("Task limit reached");
+                }
+            }
+            boolean ok = false;
+            try {
+                final Thread thread = factory.newThread(new Runnable() {
+                    public void run() {
+                        try {
+                            synchronized (lock) {
+                                int t = ++currentThreadCount;
+                                if (t > largestThreadCount) {
+                                    largestThreadCount = t;
+                                }
+                            }
+                            command.run();
+                            synchronized (lock) {
+                                currentThreadCount--;
+                            }
+                        } finally {
+                            limitSemaphore.release();
+                        }
+                    }
+                });
+                if (thread == null) {
+                    throw new ThreadCreationException("No threads can be created");
+                }
+                thread.start();
+                ok = true;
+            } finally {
+                if (! ok) semaphore.release();
+            }
+        } catch (RejectedExecutionException e) {
+            rejected.getAndIncrement();
+            throw e;
         }
-        thread.start();
     }
 
+    public boolean isBlocking() {
+        return blocking;
+    }
+
+    public void setBlocking(final boolean blocking) {
+        this.blocking = blocking;
+    }
+
+    public int getLargestThreadCount() {
+        synchronized (lock) {
+            return largestThreadCount;
+        }
+    }
+
+    public int getCurrentThreadCount() {
+        synchronized (lock) {
+            return currentThreadCount;
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getRejectedCount() {
+        return rejected.get();
+    }
+
     public String toString() {
         return String.format("%s (%s)", super.toString(), factory);
     }

Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-
-/**
- * A queueless thread pool.  If one or more threads are waiting for work when a task is submitted, it will be used.
- * Otherwise, if fewer than the maximum threads are started, a new thread is created.
- */
-public final class ThreadPool {
-
-    private final ThreadFactory threadFactory;
-
-    private volatile long idleTimeout;
-    private int maxThreads;
-    private int runningThreads;
-
-    private Task parkedTask;
-    private final Lock poolLock = new ReentrantLock();
-    // signal when parking a task
-    private final Condition park = poolLock.newCondition();
-    // signal when unparking a task
-    private final Condition unpark = poolLock.newCondition();
-
-    private State state = State.RUNNING;
-
-    private enum State {
-        RUNNING,
-        KILL,
-    }
-
-    public ThreadPool(final ThreadFactory threadFactory) {
-        this.threadFactory = threadFactory;
-        idleTimeout = 30000L;
-        maxThreads = 20;
-        runningThreads = 0;
-    }
-
-    public void executeBlocking(final DirectExecutor taskExecutor, final Runnable runnable) throws InterruptedException {
-        if (runnable == null) {
-            throw new NullPointerException("runnable is null");
-        }
-        final Lock poolLock = this.poolLock;
-        final Condition park = this.park;
-        final Condition unpark = this.unpark;
-        Task task;
-        poolLock.lockInterruptibly();
-        try {
-            for (;;) {
-                if (state == State.KILL) {
-                    throw new RejectedExecutionException("Thread pool is shut down");
-                } else if ((task = parkedTask) != null) {
-                    parkedTask = null;
-                    unpark.signal();
-                    break;
-                } else {
-                    final int runningThreads = this.runningThreads;
-                    if (runningThreads < maxThreads) {
-                        task = new Task(runnable, taskExecutor);
-                        final ThreadFactory threadFactory = this.threadFactory;
-                        final Thread newThread = threadFactory.newThread(task);
-                        if (newThread == null) {
-                            throw new RejectedExecutionException("Thread factory " + threadFactory + " will not create a thread");
-                        }
-                        newThread.start();
-                        this.runningThreads = runningThreads + 1;
-                        return;
-                    } else {
-                        park.await();
-                    }
-                }
-            }
-        } finally {
-            poolLock.unlock();
-        }
-        synchronized (task) {
-            task.runnable = runnable;
-            task.taskExecutor = taskExecutor;
-            task.notify();
-        }
-    }
-
-    private final class Task implements Runnable {
-
-        /**
-         * Protected by {@code this}.
-         */
-        private Runnable runnable;
-        private DirectExecutor taskExecutor;
-
-        Task(final Runnable runnable, final DirectExecutor taskExecutor) {
-            synchronized (this) {
-                this.runnable = runnable;
-                this.taskExecutor = taskExecutor;
-            }
-        }
-
-        public void run() {
-            final Lock poolLock = ThreadPool.this.poolLock;
-            Runnable runnable;
-            DirectExecutor taskExecutor;
-            try {
-                synchronized (this) {
-                    runnable = this.runnable;
-                    taskExecutor = this.taskExecutor;
-                    this.runnable = null;
-                    this.taskExecutor = null;
-                }
-                long idleSince = runnable == null ? System.currentTimeMillis() : Long.MAX_VALUE;
-                for (;;) {
-                    while (runnable == null) {
-                        // no task; park
-                        poolLock.lock();
-                        try {
-                            if (state == State.KILL) {
-                                return;
-                            }
-                            // wait for the spot to open
-                            Task task;
-                            while ((task = parkedTask) != null && task != this) {
-                                try {
-                                    final long remaining = idleTimeout - (System.currentTimeMillis() - idleSince);
-                                    if (remaining < 0L) {
-                                        // idle timeout; don't bother finishing parking
-                                        return;
-                                    }
-                                    unpark.await(remaining, TimeUnit.MILLISECONDS);
-                                } catch (InterruptedException e) {
-                                    if (state == State.KILL) {
-                                        return;
-                                    }
-                                }
-                            }
-                            parkedTask = this;
-                        } finally {
-                            poolLock.unlock();
-                        }
-                        // parked!  Now just wait for a task to show up
-                        synchronized (this) {
-                            while ((runnable = this.runnable) == null) {
-                                try {
-                                    final long remaining = idleTimeout - (System.currentTimeMillis() - idleSince);
-                                    if (remaining < 0L) {
-                                        // idle timeout; unpark and return
-                                        return;
-                                    }
-                                    wait(remaining);
-                                } catch (InterruptedException e) {
-                                    break;
-                                }
-                            }
-                            taskExecutor = this.taskExecutor;
-                            this.runnable = null;
-                            this.taskExecutor = null;
-                        }
-                    }
-                    (taskExecutor == null ? JBossExecutors.directExecutor() : taskExecutor).execute(runnable);
-                    synchronized (this) {
-                        runnable = this.runnable;
-                        taskExecutor = this.taskExecutor;
-                        this.runnable = null;
-                        this.taskExecutor = null;
-                    }
-                    if (runnable == null) idleSince = System.currentTimeMillis();
-                }
-            } finally {
-                poolLock.lock();
-                try {
-                    // If this task is parked, unpark it so someone else can get in there
-                    if (parkedTask == this) {
-                        parkedTask = null;
-                        unpark.signal();
-                    }
-                    runningThreads--;
-                } finally {
-                    poolLock.unlock();
-                }
-            }
-        }
-    }
-}

Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-/**
- *
- */
-public interface ThreadPoolExecutorMBean {
-    String getName();
-
-    boolean isAllowCoreThreadTimeout();
-
-    void setAllowCoreThreadTimeout(boolean allow);
-
-    int getCorePoolSize();
-
-    void setCorePoolSize(int newSize);
-
-    int getMaxPoolSize();
-
-    void setMaxPoolSize(int newSize);
-
-    long getKeepAliveTime();
-
-    void setKeepAliveTime(long milliseconds);
-
-    int getCurrentPoolSize();
-
-    int getLargestPoolSize();
-
-    int getRejectedCount();
-}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+
+class UninterruptibleExecutor implements Executor {
+
+    private final Executor delegate;
+
+    UninterruptibleExecutor(final Executor delegate) {
+        this.delegate = delegate;
+    }
+
+    public void execute(final Runnable command) {
+        JBossExecutors.executeUninterruptibly(delegate, command);
+    }
+}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface BoundedQueueThreadPoolExecutorMBean extends BoundedThreadPoolExecutorMBean {
+    boolean isAllowCoreThreadTimeout();
+
+    void setAllowCoreThreadTimeout(boolean allow);
+
+    int getMaxPoolSize();
+
+    void setMaxPoolSize(int newSize);
+
+
+}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface BoundedThreadPoolExecutorMBean extends ThreadPoolExecutorMBean {
+
+    boolean isBlocking();
+
+    void setBlocking(boolean blocking);
+
+}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface ThreadExecutorMBean {
+    String getName();
+
+    int getRejectedCount();
+
+    int getCurrentThreadCount();
+
+    int getLargestThreadCount();
+}

Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java (from rev 92149, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface ThreadPoolExecutorMBean extends ThreadExecutorMBean {
+    int getCorePoolSize();
+
+    void setCorePoolSize(int newSize);
+
+    long getKeepAliveTime();
+
+    void setKeepAliveTime(long milliseconds);
+}

Modified: projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java
===================================================================
--- projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java	2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java	2010-01-11 23:14:52 UTC (rev 99253)
@@ -62,7 +62,7 @@
         final int cnt = 100;
         final CountDownLatch taskUnfreezer = new CountDownLatch(1);
         final CountDownLatch taskFinishLine = new CountDownLatch(cnt);
-        final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, RejectionPolicy.BLOCK, null);
+        final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, true, null);
         for (int i = 0; i < cnt; i ++) {
             simpleQueueExecutor.execute(new SimpleTask(taskUnfreezer, taskFinishLine));
         }
@@ -87,7 +87,7 @@
         final AtomicBoolean ran = new AtomicBoolean();
 
         final CountDownLatch startLatch = new CountDownLatch(1);
-        final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, RejectionPolicy.BLOCK, null);
+        final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, true, null);
         simpleQueueExecutor.execute(new Runnable() {
             public void run() {
                 try {
@@ -121,7 +121,7 @@
         final int cnt = queueSize + coreThreads + extraThreads;
         final CountDownLatch taskUnfreezer = new CountDownLatch(1);
         final CountDownLatch taskFinishLine = new CountDownLatch(cnt);
-        final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", coreThreads, coreThreads + extraThreads, 500L, TimeUnit.MILLISECONDS, new ArrayQueue<Runnable>(queueSize), threadFactory, RejectionPolicy.BLOCK, null);
+        final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", coreThreads, coreThreads + extraThreads, 500L, TimeUnit.MILLISECONDS, new ArrayQueue<Runnable>(queueSize), threadFactory, true, null);
         for (int i = 0; i < cnt; i ++) {
             simpleQueueExecutor.execute(new SimpleTask(taskUnfreezer, taskFinishLine));
         }




More information about the jboss-cvs-commits mailing list