[jboss-cvs] JBossAS SVN: r99842 - projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 22 16:55:39 EST 2010


Author: david.lloyd at jboss.com
Date: 2010-01-22 16:55:38 -0500 (Fri, 22 Jan 2010)
New Revision: 99842

Added:
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java
Modified:
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java
   projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
Log:
JBTHR-10: add a blocking executor type for JCA WorkManager

Added: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor which can optionally block or not block on task submission.
+ */
+public interface BlockingExecutor extends Executor {
+
+    /**
+     * Executes the given command at some time in the future.  The command may execute in a new thread, in a pooled thread,
+     * or in the calling thread, at the discretion of the <tt>Executor</tt> implementation.  The call may block
+     * or not block, depending on the configuration of the executor.
+     *
+     * @param command the task to submit
+     *
+     * @throws ExecutionInterruptedException if the executor is configured to block, and the thread was interrupted while waiting
+     *              for the task to be accepted
+     * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws ThreadCreationException if a thread could not be created for some reason
+     * @throws RejectedExecutionException if execution is rejected for some other reason
+     * @throws NullPointerException if command is {@code null}
+     */
+    void execute(Runnable command);
+
+    /**
+     * Execute a task, blocking until it can be accepted, or until the calling thread is interrupted.
+     *
+     * @param task the task to submit
+     * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws ThreadCreationException if a thread could not be created for some reason
+     * @throws RejectedExecutionException if execution is rejected for some other reason
+     * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+     * @throws NullPointerException if command is {@code null}
+     */
+    void executeBlocking(Runnable task) throws RejectedExecutionException, InterruptedException;
+
+    /**
+     * Execute a task, blocking until it can be accepted, a timeout elapses, or the calling thread is interrupted.
+     *
+     * @param task the task to submit
+     * @param timeout the amount of time to wait
+     * @param unit the unit of time
+     * @throws ExecutionTimedOutException if the timeout elapsed before a task could be accepted
+     * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws ThreadCreationException if a thread could not be created for some reason
+     * @throws RejectedExecutionException if execution is rejected for some other reason
+     * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+     * @throws NullPointerException if command is {@code null}
+     */
+    void executeBlocking(Runnable task, long timeout, TimeUnit unit) throws RejectedExecutionException, InterruptedException;
+
+    /**
+     * Execute a task, without blocking.
+     *
+     * @param task the task to submit
+     * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws ThreadCreationException if a thread could not be created for some reason
+     * @throws RejectedExecutionException if execution is rejected for some other reason
+     * @throws NullPointerException if command is {@code null}
+     */
+    void executeNonBlocking(Runnable task) throws RejectedExecutionException;
+}

Copied: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java (from rev 99685, projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingExecutor.java)
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that simply delegates to another executor.  Use instances of this class to hide extra methods on
+ * another executor.
+ */
+class DelegatingBlockingExecutor implements BlockingExecutor {
+    private final BlockingExecutor delegate;
+
+    DelegatingBlockingExecutor(final BlockingExecutor delegate) {
+        this.delegate = delegate;
+    }
+
+    protected BlockingExecutor getDelegate() {
+        return delegate;
+    }
+
+    public void execute(final Runnable command) {
+        delegate.execute(command);
+    }
+
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        delegate.executeBlocking(task);
+    }
+
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        delegate.executeBlocking(task, timeout, unit);
+    }
+
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+        delegate.executeNonBlocking(task);
+    }
+
+    public String toString() {
+        return String.format("%s -> %s", super.toString(), delegate);
+    }
+}
\ No newline at end of file

Added: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java	                        (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when an execute-with-timeout method is called and the timeout elapsed before a task could be <b>accepted</b>.
+ */
+public class ExecutionTimedOutException extends RejectedExecutionException {
+
+    private static final long serialVersionUID = 6577491781534695133L;
+
+    /**
+     * Constructs a {@code ExecutionTimedOutException} with no detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    public ExecutionTimedOutException() {
+    }
+
+    /**
+     * Constructs a {@code ExecutionTimedOutException} with the specified detail message. The cause is not initialized, and
+     * may subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     *
+     * @param msg the detail message
+     */
+    public ExecutionTimedOutException(final String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructs a {@code ExecutionTimedOutException} with the specified cause. The detail message is set to:
+     * <pre>(cause == null ? null : cause.toString())</pre>
+     * (which typically contains the class and detail message of {@code cause}).
+     *
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public ExecutionTimedOutException(final Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a {@code ExecutionTimedOutException} with the specified detail message and cause.
+     *
+     * @param msg the detail message
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public ExecutionTimedOutException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+}

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -324,7 +324,7 @@
      * @param factory the thread factory to use
      * @return the executor
      */
-    public static Executor threadFactoryExecutor(final ThreadFactory factory) {
+    public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory) {
         return new ThreadFactoryExecutor(factory, Integer.MAX_VALUE, false, directExecutor());
     }
 
@@ -336,7 +336,7 @@
      * @param maxThreads the maximum number of allowed threads
      * @return the executor
      */
-    public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
+    public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
         return new ThreadFactoryExecutor(factory, maxThreads, false, directExecutor());
     }
 
@@ -350,7 +350,7 @@
      * @param blocking {@code true} if the submitter should block when the maximum number of threads has been reached
      * @return the executor
      */
-    public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
+    public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
         return new ThreadFactoryExecutor(factory, maxThreads, blocking, directExecutor());
     }
 
@@ -365,7 +365,7 @@
      * @param taskExecutor the executor which should run each task
      * @return the executor
      */
-    public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking, final DirectExecutor taskExecutor) {
+    public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking, final DirectExecutor taskExecutor) {
         return new ThreadFactoryExecutor(factory, maxThreads, blocking, taskExecutor);
     }
 
@@ -433,6 +433,10 @@
     // PROTECTED EXECUTOR SERVICE WRAPPERS
     // ==================================================
 
+    public static BlockingExecutor protectedBlockingExecutor(final BlockingExecutor target) {
+        return new DelegatingBlockingExecutor(target);
+    }
+
     /**
      * Wrap an executor with an {@code ExecutorService} instance which supports all the features of {@code ExecutorService}
      * except for shutting down the executor.

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -22,6 +22,7 @@
 
 package org.jboss.threads;
 
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.BlockingQueue;
@@ -33,7 +34,7 @@
 /**
  *
  */
-public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BoundedQueueThreadPoolExecutorMBean {
+public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
 
     private final AtomicInteger rejectCount = new AtomicInteger();
 
@@ -57,10 +58,22 @@
         setRejectedExecutionHandler(handler);
     }
 
-    public void execute(final Runnable command) {
-        super.execute(command);
+    public void execute(final Runnable task) {
+        super.execute(task);
     }
 
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        super.execute(task);
+    }
+
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        super.execute(task);
+    }
+
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+        super.execute(task);
+    }
+
     public int getLargestThreadCount() {
         return super.getLargestPoolSize();
     }

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -23,9 +23,10 @@
 package org.jboss.threads;
 
 import java.util.ArrayDeque;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.Condition;
@@ -36,7 +37,7 @@
  * More specifically, if a FIFO queue type is used, any call B to the {@link #execute(Runnable)} method that
  * happens-after another call A to the same method, will result in B's task running after A's.
  */
-public final class OrderedExecutor implements Executor {
+public final class OrderedExecutor implements BlockingExecutor {
 
     private final Executor parent;
     private final Runnable runner = new Runner();
@@ -125,31 +126,62 @@
         this(parent, new ArrayQueue<Runnable>(queueLength), blocking, handoffExecutor);
     }
 
-
-
     /**
      * Run a task.
      *
-     * @param command the task to run.
+     * @param task the task to run.
      */
-    public void execute(Runnable command) {
+    public void execute(Runnable task) {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
         Executor executor;
         OUT: for (;;) {
             lock.lock();
             try {
-                while (! queue.offer(command)) {
+                if (! running) {
+                    running = true;
+                    boolean ok = false;
+                    try {
+                        parent.execute(runner);
+                        ok = true;
+                    } finally {
+                        if (! ok) {
+                            running = false;
+                        }
+                    }
+                }
+                if (! queue.offer(task)) {
                     if (blocking) {
                         try {
                             removeCondition.await();
+                            continue;
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                             throw new ExecutionInterruptedException();
                         }
                     } else {
                         executor = handoffExecutor;
-                        break OUT;
+                        break;
                     }
                 }
+                return;
+            } finally {
+                lock.unlock();
+            }
+        }
+        if (executor != null) {
+            executor.execute(task);
+        }
+    }
+
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        OUT: for (;;) {
+            lock.lock();
+            try {
                 if (! running) {
                     running = true;
                     boolean ok = false;
@@ -162,13 +194,90 @@
                         }
                     }
                 }
+                if (! queue.offer(task)) {
+                    removeCondition.await();
+                    continue;
+                }
                 return;
             } finally {
                 lock.unlock();
             }
         }
+    }
+
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        long now = System.currentTimeMillis();
+        final long deadline = now + unit.toMillis(timeout);
+        if (deadline < 0L) {
+            executeBlocking(task);
+            return;
+        }
+        OUT: for (;;) {
+            lock.lock();
+            try {
+                if (! running) {
+                    running = true;
+                    boolean ok = false;
+                    try {
+                        parent.execute(runner);
+                        ok = true;
+                    } finally {
+                        if (! ok) {
+                            running = false;
+                        }
+                    }
+                }
+                if (! queue.offer(task)) {
+                    final long remaining = deadline - now;
+                    if (remaining <= 0L) {
+                        throw new ExecutionTimedOutException();
+                    }
+                    removeCondition.await(remaining, TimeUnit.MILLISECONDS);
+                    now = System.currentTimeMillis();
+                    continue;
+                }
+                return;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+    }
+
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        Executor executor;
+        OUT: for (;;) {
+            lock.lock();
+            try {
+                if (! running) {
+                    running = true;
+                    boolean ok = false;
+                    try {
+                        parent.execute(runner);
+                        ok = true;
+                    } finally {
+                        if (! ok) {
+                            running = false;
+                        }
+                    }
+                }
+                if (! queue.offer(task)) {
+                    executor = handoffExecutor;
+                    break;
+                }
+                return;
+            } finally {
+                lock.unlock();
+            }
+        }
         if (executor != null) {
-            executor.execute(command);
+            executor.execute(task);
         }
     }
 

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -42,7 +42,7 @@
 /**
  * An executor which uses a regular queue to hold tasks.  The executor may be tuned at runtime in many ways.
  */
-public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BoundedQueueThreadPoolExecutorMBean {
+public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
     private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
 
     private final Lock lock = new ReentrantLock();
@@ -155,6 +155,9 @@
      * @throws ExecutionInterruptedException when blocking is enabled and the current thread is interrupted before a task could be accepted
      */
     public void execute(final Runnable task) throws RejectedExecutionException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
         final Executor executor;
         final Lock lock = this.lock;
         lock.lock();
@@ -205,6 +208,168 @@
         return;
     }
 
+    /**
+     * Execute a task, blocking until it can be accepted, or until the calling thread is interrupted.
+     *
+     * @param task the task to submit
+     *
+     * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+     * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+     * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+     * @throws NullPointerException if command is {@code null}
+     */
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor is stopped");
+                }
+                // Try core thread first, then queue, then extra thread
+                final int count = threadCount;
+                if (count < coreThreads) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                // next queue...
+                final Queue<Runnable> queue = this.queue;
+                if (queue.offer(task)) {
+                    enqueueCondition.signal();
+                    return;
+                }
+                // extra threads?
+                if (count < maxThreads) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                removeCondition.await();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Execute a task, blocking until it can be accepted, a timeout elapses, or the calling thread is interrupted.
+     *
+     * @param task the task to submit
+     * @param timeout the amount of time to wait
+     * @param unit the unit of time
+     *
+     * @throws org.jboss.threads.ExecutionTimedOutException if the timeout elapsed before a task could be accepted
+     * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+     * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+     * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+     * @throws NullPointerException if command is {@code null}
+     */
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        long now = System.currentTimeMillis();
+        final long deadline = now + unit.toMillis(timeout);
+        if (deadline < 0L) {
+            executeBlocking(task);
+            return;
+        }
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor is stopped");
+                }
+                // Try core thread first, then queue, then extra thread
+                final int count = threadCount;
+                if (count < coreThreads) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                // next queue...
+                final Queue<Runnable> queue = this.queue;
+                if (queue.offer(task)) {
+                    enqueueCondition.signal();
+                    return;
+                }
+                // extra threads?
+                if (count < maxThreads) {
+                    startNewThread(task);
+                    threadCount = count + 1;
+                    return;
+                }
+                final long remaining = deadline - now;
+                if (remaining <= 0L) {
+                    throw new ExecutionTimedOutException();
+                }
+                removeCondition.await(remaining, TimeUnit.MILLISECONDS);
+                now = System.currentTimeMillis();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Execute a task, without blocking.
+     *
+     * @param task the task to submit
+     *
+     * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+     * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+     * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+     * @throws NullPointerException if command is {@code null}
+     */
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        final Executor executor;
+        final Lock lock = this.lock;
+        lock.lock();
+        try {
+            if (stop) {
+                throw new StoppedExecutorException("Executor is stopped");
+            }
+            // Try core thread first, then queue, then extra thread
+            final int count = threadCount;
+            if (count < coreThreads) {
+                startNewThread(task);
+                threadCount = count + 1;
+                return;
+            }
+            // next queue...
+            final Queue<Runnable> queue = this.queue;
+            if (queue.offer(task)) {
+                enqueueCondition.signal();
+                return;
+            }
+            // extra threads?
+            if (count < maxThreads) {
+                startNewThread(task);
+                threadCount = count + 1;
+                return;
+            }
+            // delegate the task outside of the lock.
+            rejectCount++;
+            executor = handoffExecutor;
+        } finally {
+            lock.unlock();
+        }
+        if (executor != null) {
+            executor.execute(task);
+        }
+        return;
+    }
+
     /** {@inheritDoc} */
     public void shutdown() {
         final Lock lock = this.lock;
@@ -495,6 +660,9 @@
     // call with lock held!
     private void startNewThread(final Runnable task) {
         final Thread thread = threadFactory.newThread(new Worker(task));
+        if (thread == null) {
+            throw new ThreadCreationException();
+        }
         workers.add(thread);
         final int size = workers.size();
         if (size > largestPoolSize) {

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -43,7 +43,7 @@
  * A queueless thread pool.  If one or more threads are waiting for work when a task is submitted, it will be used.
  * Otherwise, if fewer than the maximum threads are started, a new thread is created.
  */
-public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BoundedThreadPoolExecutorMBean  {
+public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedThreadPoolExecutorMBean  {
     private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
 
     private final ThreadFactory threadFactory;
@@ -279,7 +279,10 @@
         }
     }
 
-    public void execute(final Runnable command) {
+    public void execute(final Runnable task) {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
         final Executor executor;
         final Set<Thread> runningThreads = this.runningThreads;
         final Condition runnableDequeued = this.runnableDequeued;
@@ -294,7 +297,7 @@
                 final Worker waitingWorker;
                 if ((waitingWorker = this.waitingWorker) != null) {
                     // a worker thread was waiting for a task; give it the task and wake it up
-                    waitingWorker.setRunnable(command);
+                    waitingWorker.setRunnable(task);
                     taskEnqueued.signal();
                     this.waitingWorker = null;
                     return;
@@ -303,7 +306,7 @@
                 final int currentSize = runningThreads.size();
                 if (currentSize < maxThreads) {
                     // if we haven't reached the thread limit yet, start up another thread
-                    final Thread thread = threadFactory.newThread(new Worker(command));
+                    final Thread thread = threadFactory.newThread(new Worker(task));
                     if (thread == null) {
                         throw new ThreadCreationException();
                     }
@@ -334,7 +337,7 @@
                         throw new ExecutionInterruptedException();
                     }
                 }
-                this.workRunnable = command;
+                this.workRunnable = task;
                 try {
                     runnableDequeued.await();
                     if (this.workRunnable == null) {
@@ -353,12 +356,150 @@
             lock.unlock();
         }
         if (executor != null) {
-            executor.execute(command);
+            executor.execute(task);
         } else {
             throw new RejectedExecutionException();
         }
     }
 
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        final Set<Thread> runningThreads = this.runningThreads;
+        final Condition runnableDequeued = this.runnableDequeued;
+        final Lock lock = this.lock;
+        Runnable workRunnable;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor has been shut down");
+                }
+                final Worker waitingWorker;
+                if ((waitingWorker = this.waitingWorker) != null) {
+                    // a worker thread was waiting for a task; give it the task and wake it up
+                    waitingWorker.setRunnable(task);
+                    taskEnqueued.signal();
+                    this.waitingWorker = null;
+                    return;
+                }
+                // no worker thread was waiting
+                final int currentSize = runningThreads.size();
+                if (currentSize < maxThreads) {
+                    // if we haven't reached the thread limit yet, start up another thread
+                    final Thread thread = threadFactory.newThread(new Worker(task));
+                    if (thread == null) {
+                        throw new ThreadCreationException();
+                    }
+                    if (! runningThreads.add(thread)) {
+                        throw new ThreadCreationException("Unable to add new thread to the running set");
+                    }
+                    if (currentSize >= largestPoolSize) {
+                        largestPoolSize = currentSize + 1;
+                    }
+                    thread.start();
+                    return;
+                }
+                workRunnable = this.workRunnable;
+                if (workRunnable != null) {
+                    // someone else is waiting for a worker, so we wait for them
+                    nextReady.await();
+                    continue;
+                }
+                this.workRunnable = task;
+                try {
+                    runnableDequeued.await();
+                    if (this.workRunnable == null) {
+                        // task was accepted
+                        nextReady.signal();
+                        return;
+                    }
+                } finally {
+                    this.workRunnable = null;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        long now = System.currentTimeMillis();
+        final long deadline = now + unit.toMillis(timeout);
+        if (deadline < 0L) {
+            executeBlocking(task);
+            return;
+        }
+        final Set<Thread> runningThreads = this.runningThreads;
+        final Condition runnableDequeued = this.runnableDequeued;
+        final Lock lock = this.lock;
+        Runnable workRunnable;
+        lock.lock();
+        try {
+            for (;;) {
+                if (stop) {
+                    throw new StoppedExecutorException("Executor has been shut down");
+                }
+                final Worker waitingWorker;
+                if ((waitingWorker = this.waitingWorker) != null) {
+                    // a worker thread was waiting for a task; give it the task and wake it up
+                    waitingWorker.setRunnable(task);
+                    taskEnqueued.signal();
+                    this.waitingWorker = null;
+                    return;
+                }
+                // no worker thread was waiting
+                final int currentSize = runningThreads.size();
+                if (currentSize < maxThreads) {
+                    // if we haven't reached the thread limit yet, start up another thread
+                    final Thread thread = threadFactory.newThread(new Worker(task));
+                    if (thread == null) {
+                        throw new ThreadCreationException();
+                    }
+                    if (! runningThreads.add(thread)) {
+                        throw new ThreadCreationException("Unable to add new thread to the running set");
+                    }
+                    if (currentSize >= largestPoolSize) {
+                        largestPoolSize = currentSize + 1;
+                    }
+                    thread.start();
+                    return;
+                }
+                workRunnable = this.workRunnable;
+                if (workRunnable != null) {
+                    // someone else is waiting for a worker, so we wait for them
+                    nextReady.await();
+                    continue;
+                }
+                this.workRunnable = task;
+                try {
+                    final long remaining = deadline - now;
+                    if (remaining <= 0L) {
+                        throw new ExecutionTimedOutException();
+                    }
+                    runnableDequeued.await(remaining, TimeUnit.MILLISECONDS);
+                    now = System.currentTimeMillis();
+                    if (this.workRunnable == null) {
+                        // task was accepted
+                        nextReady.signal();
+                        return;
+                    }
+                } finally {
+                    this.workRunnable = null;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+    }
+
     private static long clipHigh(long value) {
         return value < 0 ? Long.MAX_VALUE : value;
     }

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -26,7 +26,7 @@
 /**
  * Thrown when a task is submitted to an executor which is in the process of, or has completed shutting down.
  */
-public final class StoppedExecutorException extends RejectedExecutionException {
+public class StoppedExecutorException extends RejectedExecutionException {
 
     private static final long serialVersionUID = 4815103522815471074L;
 

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -27,7 +27,7 @@
 /**
  * Thrown when a thread factory refuses to create a thread for a thread pool.
  */
-public final class ThreadCreationException extends RejectedExecutionException {
+public class ThreadCreationException extends RejectedExecutionException {
 
     private static final long serialVersionUID = 5666802385744283783L;
 

Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java	2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java	2010-01-22 21:55:38 UTC (rev 99842)
@@ -22,14 +22,14 @@
 
 package org.jboss.threads;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.jboss.threads.management.BoundedThreadPoolExecutorMBean;
 
-class ThreadFactoryExecutor implements Executor, BoundedThreadPoolExecutorMBean {
+class ThreadFactoryExecutor implements BlockingExecutor, BoundedThreadPoolExecutorMBean {
 
     private final ThreadFactory factory;
     private final Semaphore limitSemaphore;
@@ -74,7 +74,10 @@
         }
     }
 
-    public void execute(final Runnable command) {
+    public void execute(final Runnable task) {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
         try {
             final Semaphore semaphore = limitSemaphore;
             if (blocking) {
@@ -100,7 +103,7 @@
                                     largestThreadCount = t;
                                 }
                             }
-                            taskExecutor.execute(command);
+                            taskExecutor.execute(task);
                             synchronized (lock) {
                                 currentThreadCount--;
                             }
@@ -123,6 +126,133 @@
         }
     }
 
+    public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        try {
+            final Semaphore semaphore = limitSemaphore;
+            semaphore.acquire();
+            boolean ok = false;
+            try {
+                final Thread thread = factory.newThread(new Runnable() {
+                    public void run() {
+                        try {
+                            synchronized (lock) {
+                                int t = ++currentThreadCount;
+                                if (t > largestThreadCount) {
+                                    largestThreadCount = t;
+                                }
+                            }
+                            taskExecutor.execute(task);
+                            synchronized (lock) {
+                                currentThreadCount--;
+                            }
+                        } finally {
+                            limitSemaphore.release();
+                        }
+                    }
+                });
+                if (thread == null) {
+                    throw new ThreadCreationException("No threads can be created");
+                }
+                thread.start();
+                ok = true;
+            } finally {
+                if (! ok) semaphore.release();
+            }
+        } catch (RejectedExecutionException e) {
+            rejected.getAndIncrement();
+            throw e;
+        }
+    }
+
+    public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        try {
+            final Semaphore semaphore = limitSemaphore;
+            if (! semaphore.tryAcquire(timeout, unit)) {
+                throw new ExecutionTimedOutException();
+            }
+            boolean ok = false;
+            try {
+                final Thread thread = factory.newThread(new Runnable() {
+                    public void run() {
+                        try {
+                            synchronized (lock) {
+                                int t = ++currentThreadCount;
+                                if (t > largestThreadCount) {
+                                    largestThreadCount = t;
+                                }
+                            }
+                            taskExecutor.execute(task);
+                            synchronized (lock) {
+                                currentThreadCount--;
+                            }
+                        } finally {
+                            limitSemaphore.release();
+                        }
+                    }
+                });
+                if (thread == null) {
+                    throw new ThreadCreationException("No threads can be created");
+                }
+                thread.start();
+                ok = true;
+            } finally {
+                if (! ok) semaphore.release();
+            }
+        } catch (RejectedExecutionException e) {
+            rejected.getAndIncrement();
+            throw e;
+        }
+    }
+
+    public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+        if (task == null) {
+            throw new NullPointerException("task is null");
+        }
+        try {
+            final Semaphore semaphore = limitSemaphore;
+            if (! semaphore.tryAcquire()) {
+                throw new RejectedExecutionException("Task limit reached");
+            }
+            boolean ok = false;
+            try {
+                final Thread thread = factory.newThread(new Runnable() {
+                    public void run() {
+                        try {
+                            synchronized (lock) {
+                                int t = ++currentThreadCount;
+                                if (t > largestThreadCount) {
+                                    largestThreadCount = t;
+                                }
+                            }
+                            taskExecutor.execute(task);
+                            synchronized (lock) {
+                                currentThreadCount--;
+                            }
+                        } finally {
+                            limitSemaphore.release();
+                        }
+                    }
+                });
+                if (thread == null) {
+                    throw new ThreadCreationException("No threads can be created");
+                }
+                thread.start();
+                ok = true;
+            } finally {
+                if (! ok) semaphore.release();
+            }
+        } catch (RejectedExecutionException e) {
+            rejected.getAndIncrement();
+            throw e;
+        }
+    }
+
     public boolean isBlocking() {
         return blocking;
     }




More information about the jboss-cvs-commits mailing list